CyclicBarrier is a synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. It's particularly useful in parallel decomposition scenarios where tasks need to synchronize at specific points.
1. CyclicBarrier Fundamentals
Key Concepts:
- Barrier Point: The synchronization point where all threads must wait
- Parties: The number of threads that must invoke
await()before the barrier trips - Cyclic: Can be reused after the waiting threads are released
- Barrier Action: Optional Runnable that executes when the barrier trips
How It Works:
- Threads call
await()when they reach the barrier - They block until all parties reach the barrier
- Once all threads arrive, the barrier trips and all threads are released
- The barrier resets and can be used again
2. Basic CyclicBarrier Usage
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class CyclicBarrierBasicExample {
public static void main(String[] args) {
basicUsageExample();
}
public static void basicUsageExample() {
System.out.println("=== Basic CyclicBarrier Usage ===");
// Create a CyclicBarrier for 3 threads
int numberOfParties = 3;
CyclicBarrier barrier = new CyclicBarrier(numberOfParties,
() -> System.out.println("๐ All threads reached the barrier! Barrier tripped.\n"));
// Create and start worker threads
for (int i = 1; i <= numberOfParties; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread " + threadId + " started its work");
// Simulate some work
Thread.sleep(threadId * 1000L);
System.out.println("Thread " + threadId + " completed work and reached barrier");
// Wait at the barrier
int arrivalIndex = barrier.await();
System.out.println("Thread " + threadId + " continued after barrier " +
"(arrival index: " + arrivalIndex + ")");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
System.err.println("Thread " + threadId + " was interrupted");
}
}).start();
}
}
}
Output:
=== Basic CyclicBarrier Usage === Thread 1 started its work Thread 2 started its work Thread 3 started its work Thread 1 completed work and reached barrier Thread 2 completed work and reached barrier Thread 3 completed work and reached barrier ๐ All threads reached the barrier! Barrier tripped. Thread 3 continued after barrier (arrival index: 2) Thread 1 continued after barrier (arrival index: 0) Thread 2 continued after barrier (arrival index: 1)
3. Advanced CyclicBarrier Features
Timeout and Broken Barrier Handling
import java.util.concurrent.*;
public class CyclicBarrierTimeoutExample {
public static void main(String[] args) {
timeoutHandlingExample();
brokenBarrierExample();
}
public static void timeoutHandlingExample() {
System.out.println("=== Timeout Handling ===");
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties,
() -> System.out.println("Barrier tripped!"));
for (int i = 1; i <= parties; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread " + threadId + " started");
if (threadId == 3) {
// Thread 3 takes longer and times out
Thread.sleep(5000);
} else {
Thread.sleep(1000);
}
System.out.println("Thread " + threadId + " reaching barrier");
// Wait with timeout
try {
int index = barrier.await(2, TimeUnit.SECONDS);
System.out.println("Thread " + threadId + " passed barrier (index: " + index + ")");
} catch (TimeoutException e) {
System.err.println("Thread " + threadId + " timed out waiting at barrier");
// The barrier becomes broken when timeout occurs
}
} catch (InterruptedException | BrokenBarrierException e) {
System.err.println("Thread " + threadId + " interrupted or barrier broken");
Thread.currentThread().interrupt();
}
}).start();
}
}
public static void brokenBarrierExample() {
System.out.println("\n=== Broken Barrier Handling ===");
CyclicBarrier barrier = new CyclicBarrier(3);
Thread t1 = new Thread(() -> {
try {
System.out.println("Thread 1 reached barrier");
barrier.await();
System.out.println("Thread 1 passed barrier");
} catch (Exception e) {
System.err.println("Thread 1: " + e.getClass().getSimpleName());
}
});
Thread t2 = new Thread(() -> {
try {
System.out.println("Thread 2 reached barrier");
Thread.sleep(2000); // Simulate delay
barrier.await();
System.out.println("Thread 2 passed barrier");
} catch (Exception e) {
System.err.println("Thread 2: " + e.getClass().getSimpleName());
}
});
Thread t3 = new Thread(() -> {
try {
System.out.println("Thread 3 reached barrier but will interrupt");
Thread.sleep(1000);
// This will break the barrier for other threads
throw new RuntimeException("Simulated failure");
} catch (Exception e) {
System.err.println("Thread 3 interrupted barrier");
// Reset barrier to make it usable again
barrier.reset();
}
});
t1.start();
t2.start();
t3.start();
try {
t1.join();
t2.join();
t3.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Barrier is broken: " + (barrier.isBroken()));
System.out.println("Parties waiting: " + barrier.getNumberWaiting());
}
}
4. Real-World Use Cases
Use Case 1: Multi-Phase Data Processing
import java.util.concurrent.*;
import java.util.*;
public class MultiPhaseDataProcessing {
public static class DataProcessor {
private final CyclicBarrier processingBarrier;
private final int numberOfWorkers;
private final List<List<Integer>> partialResults;
private final List<Integer> finalResult;
public DataProcessor(int numberOfWorkers) {
this.numberOfWorkers = numberOfWorkers;
this.partialResults = Collections.synchronizedList(new ArrayList<>());
this.finalResult = Collections.synchronizedList(new ArrayList<>());
// Barrier for synchronization between phases
this.processingBarrier = new CyclicBarrier(numberOfWorkers,
() -> System.out.println("=== Phase completed ==="));
}
public void processData(List<Integer> data) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(numberOfWorkers);
int chunkSize = data.size() / numberOfWorkers;
// Phase 1: Process data in parallel
System.out.println("=== Starting Phase 1: Data Processing ===");
for (int i = 0; i < numberOfWorkers; i++) {
final int workerId = i;
int start = i * chunkSize;
int end = (i == numberOfWorkers - 1) ? data.size() : (i + 1) * chunkSize;
List<Integer> chunk = data.subList(start, end);
executor.execute(() -> processChunkPhase1(workerId, chunk));
}
// Wait for phase 1 completion
Thread.sleep(2000);
// Phase 2: Merge partial results
System.out.println("\n=== Starting Phase 2: Result Merging ===");
for (int i = 0; i < numberOfWorkers; i++) {
final int workerId = i;
executor.execute(() -> processChunkPhase2(workerId));
}
// Wait for completion
Thread.sleep(1000);
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("\n=== Final Results ===");
System.out.println("Final result size: " + finalResult.size());
System.out.println("Final result sum: " + finalResult.stream().mapToInt(Integer::intValue).sum());
}
private void processChunkPhase1(int workerId, List<Integer> chunk) {
try {
System.out.println("Worker " + workerId + " processing " + chunk.size() + " items");
// Simulate processing
Thread.sleep(500 + workerId * 100L);
// Process data (square each number)
List<Integer> processed = chunk.stream()
.map(x -> x * x)
.toList();
synchronized (partialResults) {
partialResults.add(processed);
}
System.out.println("Worker " + workerId + " completed phase 1");
processingBarrier.await(); // Wait for all workers
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}
private void processChunkPhase2(int workerId) {
try {
System.out.println("Worker " + workerId + " starting phase 2");
// Each worker processes a portion of the partial results
List<Integer> workerResults = new ArrayList<>();
synchronized (partialResults) {
for (List<Integer> partial : partialResults) {
if (partial.size() > workerId) {
workerResults.add(partial.get(workerId % partial.size()));
}
}
}
// Simulate processing
Thread.sleep(300);
synchronized (finalResult) {
finalResult.addAll(workerResults);
}
System.out.println("Worker " + workerId + " completed phase 2 with " +
workerResults.size() + " results");
processingBarrier.await(); // Wait for all workers
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws Exception {
// Generate test data
List<Integer> data = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
data.add(i);
}
DataProcessor processor = new DataProcessor(4);
processor.processData(data);
}
}
Use Case 2: Parallel Matrix Operations
import java.util.concurrent.*;
import java.util.*;
public class ParallelMatrixOperations {
public static class MatrixProcessor {
private final CyclicBarrier barrier;
private final double[][] matrix;
private final double[][] result;
private final int workers;
public MatrixProcessor(double[][] matrix, int workers) {
this.matrix = matrix;
this.workers = workers;
this.result = new double[matrix.length][matrix[0].length];
this.barrier = new CyclicBarrier(workers, this::combineResults);
}
public double[][] process() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(workers);
int rowsPerWorker = matrix.length / workers;
System.out.println("Starting parallel matrix processing with " + workers + " workers");
// Submit tasks for each worker
for (int i = 0; i < workers; i++) {
final int workerId = i;
int startRow = i * rowsPerWorker;
int endRow = (i == workers - 1) ? matrix.length : (i + 1) * rowsPerWorker;
executor.execute(() -> processRows(workerId, startRow, endRow));
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
return result;
}
private void processRows(int workerId, int startRow, int endRow) {
try {
System.out.println("Worker " + workerId + " processing rows " +
startRow + " to " + (endRow - 1));
// Phase 1: Process rows (apply some transformation)
for (int i = startRow; i < endRow; i++) {
for (int j = 0; j < matrix[i].length; j++) {
result[i][j] = Math.sin(matrix[i][j]) + Math.cos(matrix[i][j]);
}
}
System.out.println("Worker " + workerId + " completed phase 1");
barrier.await(); // Wait for all workers to complete phase 1
// Phase 2: Additional processing (normalize values)
for (int i = startRow; i < endRow; i++) {
double rowSum = 0;
for (int j = 0; j < matrix[i].length; j++) {
rowSum += result[i][j];
}
for (int j = 0; j < matrix[i].length; j++) {
result[i][j] /= rowSum;
}
}
System.out.println("Worker " + workerId + " completed phase 2");
barrier.await(); // Wait for all workers to complete phase 2
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}
private void combineResults() {
// This runs when all threads reach the barrier
System.out.println("All workers reached synchronization point");
System.out.println("Combining results...");
// Can perform global operations here
double globalSum = 0;
for (double[] row : result) {
for (double value : row) {
globalSum += value;
}
}
System.out.println("Global sum of results: " + globalSum);
}
}
public static void main(String[] args) throws Exception {
// Create a sample matrix
double[][] matrix = new double[6][4];
Random random = new Random();
for (int i = 0; i < matrix.length; i++) {
for (int j = 0; j < matrix[i].length; j++) {
matrix[i][j] = random.nextDouble() * 10;
}
}
MatrixProcessor processor = new MatrixProcessor(matrix, 3);
double[][] result = processor.process();
System.out.println("\nProcessing completed successfully");
System.out.println("Result matrix dimensions: " + result.length + "x" + result[0].length);
}
}
5. Complex Multi-Stage Workflow
import java.util.concurrent.*;
import java.util.*;
import java.util.concurrent.atomic.*;
public class MultiStageWorkflow {
public static class WorkflowProcessor {
private final CyclicBarrier stageBarrier;
private final int numberOfStages;
private final int workersPerStage;
private final AtomicInteger currentStage;
private final List<AtomicInteger> workerProgress;
public WorkflowProcessor(int stages, int workers) {
this.numberOfStages = stages;
this.workersPerStage = workers;
this.currentStage = new AtomicInteger(1);
this.workerProgress = new ArrayList<>();
for (int i = 0; i < workers; i++) {
workerProgress.add(new AtomicInteger(0));
}
// Barrier with stage completion action
this.stageBarrier = new CyclicBarrier(workers, this::onStageCompletion);
}
public void executeWorkflow() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(workersPerStage);
System.out.println("๐ Starting workflow with " + numberOfStages + " stages and " +
workersPerStage + " workers");
// Submit workers
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < workersPerStage; i++) {
final int workerId = i;
futures.add(executor.submit(() -> executeWorker(workerId)));
}
// Wait for completion
for (Future<?> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
System.err.println("Worker execution failed: " + e.getCause());
}
}
executor.shutdown();
System.out.println("โ
Workflow completed successfully");
}
private void executeWorker(int workerId) {
try {
while (currentStage.get() <= numberOfStages) {
int stage = currentStage.get();
System.out.println("Worker " + workerId + " starting stage " + stage);
// Simulate stage work
performStageWork(workerId, stage);
workerProgress.get(workerId).set(stage);
System.out.println("Worker " + workerId + " completed stage " + stage);
// Wait for all workers to complete this stage
stageBarrier.await();
}
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}
private void performStageWork(int workerId, int stage) throws InterruptedException {
// Different work for different stages
switch (stage) {
case 1:
// Data loading stage
Thread.sleep(1000 + workerId * 100L);
break;
case 2:
// Data processing stage
Thread.sleep(1500 + workerId * 150L);
break;
case 3:
// Data validation stage
Thread.sleep(800 + workerId * 80L);
break;
case 4:
// Result compilation stage
Thread.sleep(1200 + workerId * 120L);
break;
default:
Thread.sleep(500);
}
}
private void onStageCompletion() {
int completedStage = currentStage.getAndIncrement();
System.out.println("\n๐ฏ ===== STAGE " + completedStage + " COMPLETED =====");
// Print progress report
System.out.print("Worker progress: ");
for (int i = 0; i < workerProgress.size(); i++) {
System.out.print("W" + i + "=" + workerProgress.get(i).get() + " ");
}
System.out.println("\n");
if (completedStage < numberOfStages) {
System.out.println("๐ Proceeding to stage " + currentStage.get());
}
}
}
public static void main(String[] args) throws Exception {
WorkflowProcessor processor = new WorkflowProcessor(4, 4);
processor.executeWorkflow();
}
}
6. Error Handling and Recovery
import java.util.concurrent.*;
import java.util.*;
public class RobustCyclicBarrierExample {
public static class ResilientProcessor {
private CyclicBarrier barrier;
private final int parties;
private final AtomicInteger errorCount;
private final List<CompletableFuture<Void>> workerFutures;
public ResilientProcessor(int parties) {
this.parties = parties;
this.errorCount = new AtomicInteger(0);
this.workerFutures = new ArrayList<>();
createBarrier();
}
private void createBarrier() {
this.barrier = new CyclicBarrier(parties, () -> {
System.out.println("โ
Barrier tripped successfully");
errorCount.set(0); // Reset error count on successful barrier trip
});
}
public void processWithRetry(List<Runnable> tasks, int maxRetries) {
if (tasks.size() != parties) {
throw new IllegalArgumentException("Number of tasks must match parties");
}
for (int retry = 0; retry <= maxRetries; retry++) {
System.out.println("\n=== Attempt " + (retry + 1) + " ===");
if (executeTasks(tasks)) {
System.out.println("๐ All tasks completed successfully!");
return;
} else {
System.out.println("โ Attempt failed. " + errorCount.get() + " errors occurred.");
if (retry < maxRetries) {
System.out.println("๐ Retrying...");
resetBarrier();
}
}
}
System.out.println("๐ฅ Maximum retries exceeded. Processing failed.");
}
private boolean executeTasks(List<Runnable> tasks) {
workerFutures.clear();
for (int i = 0; i < tasks.size(); i++) {
final int taskId = i;
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
System.out.println("Task " + taskId + " started");
tasks.get(taskId).run();
System.out.println("Task " + taskId + " completed work, waiting at barrier");
barrier.await(5, TimeUnit.SECONDS); // Wait with timeout
System.out.println("Task " + taskId + " passed barrier");
} catch (TimeoutException e) {
System.err.println("Task " + taskId + " timed out at barrier");
errorCount.incrementAndGet();
barrier.reset(); // Reset barrier on timeout
} catch (BrokenBarrierException e) {
System.err.println("Task " + taskId + " found broken barrier");
errorCount.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Task " + taskId + " interrupted");
} catch (Exception e) {
System.err.println("Task " + taskId + " failed: " + e.getMessage());
errorCount.incrementAndGet();
barrier.reset(); // Reset barrier on failure
}
});
workerFutures.add(future);
}
// Wait for all tasks to complete
try {
CompletableFuture.allOf(workerFutures.toArray(new CompletableFuture[0]))
.get(10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.err.println("Overall execution timed out");
return false;
} catch (Exception e) {
System.err.println("Execution failed: " + e.getMessage());
return false;
}
return errorCount.get() == 0 && !barrier.isBroken();
}
private void resetBarrier() {
if (barrier.isBroken()) {
System.out.println("Resetting broken barrier...");
barrier.reset();
}
errorCount.set(0);
}
}
public static void main(String[] args) {
List<Runnable> tasks = Arrays.asList(
() -> {
try {
Thread.sleep(1000);
System.out.println("Task 0: Processed data chunk A");
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
},
() -> {
try {
Thread.sleep(2000);
System.out.println("Task 1: Processed data chunk B");
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
},
() -> {
try {
// Simulate occasional failure
if (Math.random() > 0.7) {
throw new RuntimeException("Simulated processing error");
}
Thread.sleep(1500);
System.out.println("Task 2: Processed data chunk C");
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
},
() -> {
try {
Thread.sleep(1800);
System.out.println("Task 3: Processed data chunk D");
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
);
ResilientProcessor processor = new ResilientProcessor(4);
processor.processWithRetry(tasks, 3);
}
}
7. Performance Monitoring and Metrics
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;
public class MonitoredCyclicBarrier {
public static class BarrierMetrics {
private final LongAdder totalWaits = new LongAdder();
private final LongAdder successfulWaits = new LongAdder();
private final LongAdder timeoutWaits = new LongAdder();
private final LongAdder brokenBarrierWaits = new LongAdder();
private final LongSummaryStatistics waitTimeStats = new LongSummaryStatistics();
public void recordWait(long waitTime, boolean success, boolean timeout, boolean broken) {
totalWaits.increment();
if (success) successfulWaits.increment();
if (timeout) timeoutWaits.increment();
if (broken) brokenBarrierWaits.increment();
waitTimeStats.accept(waitTime);
}
public void printMetrics() {
System.out.println("\n=== Barrier Metrics ===");
System.out.println("Total waits: " + totalWaits.sum());
System.out.println("Successful: " + successfulWaits.sum());
System.out.println("Timeouts: " + timeoutWaits.sum());
System.out.println("Broken barriers: " + brokenBarrierWaits.sum());
System.out.println("Wait time - Avg: " + waitTimeStats.getAverage() + "ms, " +
"Min: " + waitTimeStats.getMin() + "ms, " +
"Max: " + waitTimeStats.getMax() + "ms");
}
}
public static class MonitoredBarrier {
private final CyclicBarrier barrier;
private final BarrierMetrics metrics;
private final int parties;
public MonitoredBarrier(int parties, Runnable barrierAction, BarrierMetrics metrics) {
this.parties = parties;
this.metrics = metrics;
this.barrier = new CyclicBarrier(parties, barrierAction);
}
public int awaitWithMetrics(long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException, BrokenBarrierException {
long startTime = System.currentTimeMillis();
boolean success = false;
boolean timeoutOccurred = false;
boolean brokenOccurred = false;
try {
int result = barrier.await(timeout, unit);
success = true;
return result;
} catch (TimeoutException e) {
timeoutOccurred = true;
throw e;
} catch (BrokenBarrierException e) {
brokenOccurred = true;
throw e;
} finally {
long waitTime = System.currentTimeMillis() - startTime;
metrics.recordWait(waitTime, success, timeoutOccurred, brokenOccurred);
}
}
public boolean isBroken() {
return barrier.isBroken();
}
public void reset() {
barrier.reset();
}
public int getNumberWaiting() {
return barrier.getNumberWaiting();
}
}
public static void main(String[] args) throws Exception {
BarrierMetrics metrics = new BarrierMetrics();
int parties = 4;
MonitoredBarrier monitoredBarrier = new MonitoredBarrier(parties,
() -> System.out.println("Barrier action executed at " + System.currentTimeMillis()),
metrics);
ExecutorService executor = Executors.newFixedThreadPool(parties);
for (int i = 0; i < 5; i++) { // Multiple barrier cycles
for (int j = 0; j < parties; j++) {
final int cycle = i;
final int workerId = j;
executor.execute(() -> {
try {
// Simulate variable work time
Thread.sleep(workerId * 100L + cycle * 50L);
System.out.printf("Cycle %d - Worker %d waiting at barrier%n", cycle, workerId);
monitoredBarrier.awaitWithMetrics(3, TimeUnit.SECONDS);
System.out.printf("Cycle %d - Worker %d passed barrier%n", cycle, workerId);
} catch (Exception e) {
System.err.printf("Cycle %d - Worker %d failed: %s%n",
cycle, workerId, e.getMessage());
}
});
}
Thread.sleep(1000); // Space out cycles
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
metrics.printMetrics();
}
}
8. Best Practices and Patterns
Pattern 1: Pipeline Processing with CyclicBarrier
import java.util.concurrent.*;
import java.util.*;
public class PipelineProcessingPattern {
public static class PipelineStage<T> {
private final CyclicBarrier stageBarrier;
private final String stageName;
private final Processor<T> processor;
public PipelineStage(String name, int parallelism, Processor<T> processor) {
this.stageName = name;
this.processor = processor;
this.stageBarrier = new CyclicBarrier(parallelism,
() -> System.out.println("[" + name + "] Stage completed"));
}
public List<T> process(List<T> inputs) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(stageBarrier.getParties());
List<Future<T>> futures = new ArrayList<>();
System.out.println("[" + stageName + "] Starting processing of " + inputs.size() + " items");
// Submit processing tasks
for (T input : inputs) {
Future<T> future = executor.submit(() -> {
T result = processor.process(input);
stageBarrier.await(); // Synchronize after each batch
return result;
});
futures.add(future);
}
// Collect results
List<T> results = new ArrayList<>();
for (Future<T> future : futures) {
results.add(future.get());
}
executor.shutdown();
return results;
}
}
public interface Processor<T> {
T process(T input) throws Exception;
}
public static void main(String[] args) throws Exception {
// Create pipeline stages
PipelineStage<String> validationStage = new PipelineStage<>("VALIDATION", 2,
input -> {
Thread.sleep(100);
if (input == null || input.trim().isEmpty()) {
throw new IllegalArgumentException("Invalid input");
}
return input.toUpperCase();
});
PipelineStage<String> processingStage = new PipelineStage<>("PROCESSING", 3,
input -> {
Thread.sleep(200);
return input + "-PROCESSED";
});
PipelineStage<String> formattingStage = new PipelineStage<>("FORMATTING", 2,
input -> {
Thread.sleep(150);
return "[" + input + "]";
});
// Process data through pipeline
List<String> inputs = Arrays.asList("apple", "banana", "cherry", "date", "elderberry");
System.out.println("Starting pipeline processing...");
List<String> validated = validationStage.process(inputs);
System.out.println("After validation: " + validated);
List<String> processed = processingStage.process(validated);
System.out.println("After processing: " + processed);
List<String> formatted = formattingStage.process(processed);
System.out.println("Final results: " + formatted);
}
}
Conclusion
Key Benefits of CyclicBarrier:
- Synchronization: Perfect for coordinating multiple threads at specific points
- Reusability: Can be used multiple times unlike CountDownLatch
- Flexibility: Supports barrier actions and timeouts
- Parallel Processing: Ideal for parallel algorithms with synchronization points
Common Use Cases:
- Parallel Algorithms: Divide and conquer problems
- Multi-phase Processing: Data processing pipelines
- Simulations: Synchronize simulation steps
- Testing: Coordinate test execution
- Batch Processing: Synchronize batch operations
Best Practices:
- Handle Exceptions Properly: Always handle
BrokenBarrierExceptionandInterruptedException - Use Timeouts: Prevent permanent blocking with
await(timeout, unit) - Monitor Barrier State: Check
isBroken()and usereset()when needed - Choose Appropriate Party Count: Match the number of worker threads
- Keep Barrier Actions Light: Avoid long-running operations in barrier actions
CyclicBarrier is a powerful synchronization tool that enables complex coordination patterns in concurrent Java applications, making it essential for parallel processing and multi-stage workflows.