CyclicBarrier for Group Synchronization in Java: Complete Guide

CyclicBarrier is a synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. It's particularly useful in parallel decomposition scenarios where tasks need to synchronize at specific points.


1. CyclicBarrier Fundamentals

Key Concepts:

  • Barrier Point: The synchronization point where all threads must wait
  • Parties: The number of threads that must invoke await() before the barrier trips
  • Cyclic: Can be reused after the waiting threads are released
  • Barrier Action: Optional Runnable that executes when the barrier trips

How It Works:

  1. Threads call await() when they reach the barrier
  2. They block until all parties reach the barrier
  3. Once all threads arrive, the barrier trips and all threads are released
  4. The barrier resets and can be used again

2. Basic CyclicBarrier Usage

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class CyclicBarrierBasicExample {
public static void main(String[] args) {
basicUsageExample();
}
public static void basicUsageExample() {
System.out.println("=== Basic CyclicBarrier Usage ===");
// Create a CyclicBarrier for 3 threads
int numberOfParties = 3;
CyclicBarrier barrier = new CyclicBarrier(numberOfParties, 
() -> System.out.println("๐ŸŽ‰ All threads reached the barrier! Barrier tripped.\n"));
// Create and start worker threads
for (int i = 1; i <= numberOfParties; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread " + threadId + " started its work");
// Simulate some work
Thread.sleep(threadId * 1000L);
System.out.println("Thread " + threadId + " completed work and reached barrier");
// Wait at the barrier
int arrivalIndex = barrier.await();
System.out.println("Thread " + threadId + " continued after barrier " + 
"(arrival index: " + arrivalIndex + ")");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
System.err.println("Thread " + threadId + " was interrupted");
}
}).start();
}
}
}

Output:

=== Basic CyclicBarrier Usage ===
Thread 1 started its work
Thread 2 started its work
Thread 3 started its work
Thread 1 completed work and reached barrier
Thread 2 completed work and reached barrier
Thread 3 completed work and reached barrier
๐ŸŽ‰ All threads reached the barrier! Barrier tripped.
Thread 3 continued after barrier (arrival index: 2)
Thread 1 continued after barrier (arrival index: 0)
Thread 2 continued after barrier (arrival index: 1)

3. Advanced CyclicBarrier Features

Timeout and Broken Barrier Handling

import java.util.concurrent.*;
public class CyclicBarrierTimeoutExample {
public static void main(String[] args) {
timeoutHandlingExample();
brokenBarrierExample();
}
public static void timeoutHandlingExample() {
System.out.println("=== Timeout Handling ===");
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, 
() -> System.out.println("Barrier tripped!"));
for (int i = 1; i <= parties; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread " + threadId + " started");
if (threadId == 3) {
// Thread 3 takes longer and times out
Thread.sleep(5000);
} else {
Thread.sleep(1000);
}
System.out.println("Thread " + threadId + " reaching barrier");
// Wait with timeout
try {
int index = barrier.await(2, TimeUnit.SECONDS);
System.out.println("Thread " + threadId + " passed barrier (index: " + index + ")");
} catch (TimeoutException e) {
System.err.println("Thread " + threadId + " timed out waiting at barrier");
// The barrier becomes broken when timeout occurs
}
} catch (InterruptedException | BrokenBarrierException e) {
System.err.println("Thread " + threadId + " interrupted or barrier broken");
Thread.currentThread().interrupt();
}
}).start();
}
}
public static void brokenBarrierExample() {
System.out.println("\n=== Broken Barrier Handling ===");
CyclicBarrier barrier = new CyclicBarrier(3);
Thread t1 = new Thread(() -> {
try {
System.out.println("Thread 1 reached barrier");
barrier.await();
System.out.println("Thread 1 passed barrier");
} catch (Exception e) {
System.err.println("Thread 1: " + e.getClass().getSimpleName());
}
});
Thread t2 = new Thread(() -> {
try {
System.out.println("Thread 2 reached barrier");
Thread.sleep(2000); // Simulate delay
barrier.await();
System.out.println("Thread 2 passed barrier");
} catch (Exception e) {
System.err.println("Thread 2: " + e.getClass().getSimpleName());
}
});
Thread t3 = new Thread(() -> {
try {
System.out.println("Thread 3 reached barrier but will interrupt");
Thread.sleep(1000);
// This will break the barrier for other threads
throw new RuntimeException("Simulated failure");
} catch (Exception e) {
System.err.println("Thread 3 interrupted barrier");
// Reset barrier to make it usable again
barrier.reset();
}
});
t1.start();
t2.start();
t3.start();
try {
t1.join();
t2.join();
t3.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Barrier is broken: " + (barrier.isBroken()));
System.out.println("Parties waiting: " + barrier.getNumberWaiting());
}
}

4. Real-World Use Cases

Use Case 1: Multi-Phase Data Processing

import java.util.concurrent.*;
import java.util.*;
public class MultiPhaseDataProcessing {
public static class DataProcessor {
private final CyclicBarrier processingBarrier;
private final int numberOfWorkers;
private final List<List<Integer>> partialResults;
private final List<Integer> finalResult;
public DataProcessor(int numberOfWorkers) {
this.numberOfWorkers = numberOfWorkers;
this.partialResults = Collections.synchronizedList(new ArrayList<>());
this.finalResult = Collections.synchronizedList(new ArrayList<>());
// Barrier for synchronization between phases
this.processingBarrier = new CyclicBarrier(numberOfWorkers, 
() -> System.out.println("=== Phase completed ==="));
}
public void processData(List<Integer> data) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(numberOfWorkers);
int chunkSize = data.size() / numberOfWorkers;
// Phase 1: Process data in parallel
System.out.println("=== Starting Phase 1: Data Processing ===");
for (int i = 0; i < numberOfWorkers; i++) {
final int workerId = i;
int start = i * chunkSize;
int end = (i == numberOfWorkers - 1) ? data.size() : (i + 1) * chunkSize;
List<Integer> chunk = data.subList(start, end);
executor.execute(() -> processChunkPhase1(workerId, chunk));
}
// Wait for phase 1 completion
Thread.sleep(2000);
// Phase 2: Merge partial results
System.out.println("\n=== Starting Phase 2: Result Merging ===");
for (int i = 0; i < numberOfWorkers; i++) {
final int workerId = i;
executor.execute(() -> processChunkPhase2(workerId));
}
// Wait for completion
Thread.sleep(1000);
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("\n=== Final Results ===");
System.out.println("Final result size: " + finalResult.size());
System.out.println("Final result sum: " + finalResult.stream().mapToInt(Integer::intValue).sum());
}
private void processChunkPhase1(int workerId, List<Integer> chunk) {
try {
System.out.println("Worker " + workerId + " processing " + chunk.size() + " items");
// Simulate processing
Thread.sleep(500 + workerId * 100L);
// Process data (square each number)
List<Integer> processed = chunk.stream()
.map(x -> x * x)
.toList();
synchronized (partialResults) {
partialResults.add(processed);
}
System.out.println("Worker " + workerId + " completed phase 1");
processingBarrier.await(); // Wait for all workers
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}
private void processChunkPhase2(int workerId) {
try {
System.out.println("Worker " + workerId + " starting phase 2");
// Each worker processes a portion of the partial results
List<Integer> workerResults = new ArrayList<>();
synchronized (partialResults) {
for (List<Integer> partial : partialResults) {
if (partial.size() > workerId) {
workerResults.add(partial.get(workerId % partial.size()));
}
}
}
// Simulate processing
Thread.sleep(300);
synchronized (finalResult) {
finalResult.addAll(workerResults);
}
System.out.println("Worker " + workerId + " completed phase 2 with " + 
workerResults.size() + " results");
processingBarrier.await(); // Wait for all workers
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws Exception {
// Generate test data
List<Integer> data = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
data.add(i);
}
DataProcessor processor = new DataProcessor(4);
processor.processData(data);
}
}

Use Case 2: Parallel Matrix Operations

import java.util.concurrent.*;
import java.util.*;
public class ParallelMatrixOperations {
public static class MatrixProcessor {
private final CyclicBarrier barrier;
private final double[][] matrix;
private final double[][] result;
private final int workers;
public MatrixProcessor(double[][] matrix, int workers) {
this.matrix = matrix;
this.workers = workers;
this.result = new double[matrix.length][matrix[0].length];
this.barrier = new CyclicBarrier(workers, this::combineResults);
}
public double[][] process() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(workers);
int rowsPerWorker = matrix.length / workers;
System.out.println("Starting parallel matrix processing with " + workers + " workers");
// Submit tasks for each worker
for (int i = 0; i < workers; i++) {
final int workerId = i;
int startRow = i * rowsPerWorker;
int endRow = (i == workers - 1) ? matrix.length : (i + 1) * rowsPerWorker;
executor.execute(() -> processRows(workerId, startRow, endRow));
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
return result;
}
private void processRows(int workerId, int startRow, int endRow) {
try {
System.out.println("Worker " + workerId + " processing rows " + 
startRow + " to " + (endRow - 1));
// Phase 1: Process rows (apply some transformation)
for (int i = startRow; i < endRow; i++) {
for (int j = 0; j < matrix[i].length; j++) {
result[i][j] = Math.sin(matrix[i][j]) + Math.cos(matrix[i][j]);
}
}
System.out.println("Worker " + workerId + " completed phase 1");
barrier.await(); // Wait for all workers to complete phase 1
// Phase 2: Additional processing (normalize values)
for (int i = startRow; i < endRow; i++) {
double rowSum = 0;
for (int j = 0; j < matrix[i].length; j++) {
rowSum += result[i][j];
}
for (int j = 0; j < matrix[i].length; j++) {
result[i][j] /= rowSum;
}
}
System.out.println("Worker " + workerId + " completed phase 2");
barrier.await(); // Wait for all workers to complete phase 2
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}
private void combineResults() {
// This runs when all threads reach the barrier
System.out.println("All workers reached synchronization point");
System.out.println("Combining results...");
// Can perform global operations here
double globalSum = 0;
for (double[] row : result) {
for (double value : row) {
globalSum += value;
}
}
System.out.println("Global sum of results: " + globalSum);
}
}
public static void main(String[] args) throws Exception {
// Create a sample matrix
double[][] matrix = new double[6][4];
Random random = new Random();
for (int i = 0; i < matrix.length; i++) {
for (int j = 0; j < matrix[i].length; j++) {
matrix[i][j] = random.nextDouble() * 10;
}
}
MatrixProcessor processor = new MatrixProcessor(matrix, 3);
double[][] result = processor.process();
System.out.println("\nProcessing completed successfully");
System.out.println("Result matrix dimensions: " + result.length + "x" + result[0].length);
}
}

5. Complex Multi-Stage Workflow

import java.util.concurrent.*;
import java.util.*;
import java.util.concurrent.atomic.*;
public class MultiStageWorkflow {
public static class WorkflowProcessor {
private final CyclicBarrier stageBarrier;
private final int numberOfStages;
private final int workersPerStage;
private final AtomicInteger currentStage;
private final List<AtomicInteger> workerProgress;
public WorkflowProcessor(int stages, int workers) {
this.numberOfStages = stages;
this.workersPerStage = workers;
this.currentStage = new AtomicInteger(1);
this.workerProgress = new ArrayList<>();
for (int i = 0; i < workers; i++) {
workerProgress.add(new AtomicInteger(0));
}
// Barrier with stage completion action
this.stageBarrier = new CyclicBarrier(workers, this::onStageCompletion);
}
public void executeWorkflow() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(workersPerStage);
System.out.println("๐Ÿš€ Starting workflow with " + numberOfStages + " stages and " + 
workersPerStage + " workers");
// Submit workers
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < workersPerStage; i++) {
final int workerId = i;
futures.add(executor.submit(() -> executeWorker(workerId)));
}
// Wait for completion
for (Future<?> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
System.err.println("Worker execution failed: " + e.getCause());
}
}
executor.shutdown();
System.out.println("โœ… Workflow completed successfully");
}
private void executeWorker(int workerId) {
try {
while (currentStage.get() <= numberOfStages) {
int stage = currentStage.get();
System.out.println("Worker " + workerId + " starting stage " + stage);
// Simulate stage work
performStageWork(workerId, stage);
workerProgress.get(workerId).set(stage);
System.out.println("Worker " + workerId + " completed stage " + stage);
// Wait for all workers to complete this stage
stageBarrier.await();
}
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}
private void performStageWork(int workerId, int stage) throws InterruptedException {
// Different work for different stages
switch (stage) {
case 1:
// Data loading stage
Thread.sleep(1000 + workerId * 100L);
break;
case 2:
// Data processing stage
Thread.sleep(1500 + workerId * 150L);
break;
case 3:
// Data validation stage
Thread.sleep(800 + workerId * 80L);
break;
case 4:
// Result compilation stage
Thread.sleep(1200 + workerId * 120L);
break;
default:
Thread.sleep(500);
}
}
private void onStageCompletion() {
int completedStage = currentStage.getAndIncrement();
System.out.println("\n๐ŸŽฏ ===== STAGE " + completedStage + " COMPLETED =====");
// Print progress report
System.out.print("Worker progress: ");
for (int i = 0; i < workerProgress.size(); i++) {
System.out.print("W" + i + "=" + workerProgress.get(i).get() + " ");
}
System.out.println("\n");
if (completedStage < numberOfStages) {
System.out.println("๐Ÿ”„ Proceeding to stage " + currentStage.get());
}
}
}
public static void main(String[] args) throws Exception {
WorkflowProcessor processor = new WorkflowProcessor(4, 4);
processor.executeWorkflow();
}
}

6. Error Handling and Recovery

import java.util.concurrent.*;
import java.util.*;
public class RobustCyclicBarrierExample {
public static class ResilientProcessor {
private CyclicBarrier barrier;
private final int parties;
private final AtomicInteger errorCount;
private final List<CompletableFuture<Void>> workerFutures;
public ResilientProcessor(int parties) {
this.parties = parties;
this.errorCount = new AtomicInteger(0);
this.workerFutures = new ArrayList<>();
createBarrier();
}
private void createBarrier() {
this.barrier = new CyclicBarrier(parties, () -> {
System.out.println("โœ… Barrier tripped successfully");
errorCount.set(0); // Reset error count on successful barrier trip
});
}
public void processWithRetry(List<Runnable> tasks, int maxRetries) {
if (tasks.size() != parties) {
throw new IllegalArgumentException("Number of tasks must match parties");
}
for (int retry = 0; retry <= maxRetries; retry++) {
System.out.println("\n=== Attempt " + (retry + 1) + " ===");
if (executeTasks(tasks)) {
System.out.println("๐ŸŽ‰ All tasks completed successfully!");
return;
} else {
System.out.println("โŒ Attempt failed. " + errorCount.get() + " errors occurred.");
if (retry < maxRetries) {
System.out.println("๐Ÿ”„ Retrying...");
resetBarrier();
}
}
}
System.out.println("๐Ÿ’ฅ Maximum retries exceeded. Processing failed.");
}
private boolean executeTasks(List<Runnable> tasks) {
workerFutures.clear();
for (int i = 0; i < tasks.size(); i++) {
final int taskId = i;
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
System.out.println("Task " + taskId + " started");
tasks.get(taskId).run();
System.out.println("Task " + taskId + " completed work, waiting at barrier");
barrier.await(5, TimeUnit.SECONDS); // Wait with timeout
System.out.println("Task " + taskId + " passed barrier");
} catch (TimeoutException e) {
System.err.println("Task " + taskId + " timed out at barrier");
errorCount.incrementAndGet();
barrier.reset(); // Reset barrier on timeout
} catch (BrokenBarrierException e) {
System.err.println("Task " + taskId + " found broken barrier");
errorCount.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Task " + taskId + " interrupted");
} catch (Exception e) {
System.err.println("Task " + taskId + " failed: " + e.getMessage());
errorCount.incrementAndGet();
barrier.reset(); // Reset barrier on failure
}
});
workerFutures.add(future);
}
// Wait for all tasks to complete
try {
CompletableFuture.allOf(workerFutures.toArray(new CompletableFuture[0]))
.get(10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.err.println("Overall execution timed out");
return false;
} catch (Exception e) {
System.err.println("Execution failed: " + e.getMessage());
return false;
}
return errorCount.get() == 0 && !barrier.isBroken();
}
private void resetBarrier() {
if (barrier.isBroken()) {
System.out.println("Resetting broken barrier...");
barrier.reset();
}
errorCount.set(0);
}
}
public static void main(String[] args) {
List<Runnable> tasks = Arrays.asList(
() -> {
try { 
Thread.sleep(1000); 
System.out.println("Task 0: Processed data chunk A");
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
},
() -> {
try { 
Thread.sleep(2000); 
System.out.println("Task 1: Processed data chunk B");
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
},
() -> {
try { 
// Simulate occasional failure
if (Math.random() > 0.7) {
throw new RuntimeException("Simulated processing error");
}
Thread.sleep(1500); 
System.out.println("Task 2: Processed data chunk C");
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
},
() -> {
try { 
Thread.sleep(1800); 
System.out.println("Task 3: Processed data chunk D");
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
);
ResilientProcessor processor = new ResilientProcessor(4);
processor.processWithRetry(tasks, 3);
}
}

7. Performance Monitoring and Metrics

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;
public class MonitoredCyclicBarrier {
public static class BarrierMetrics {
private final LongAdder totalWaits = new LongAdder();
private final LongAdder successfulWaits = new LongAdder();
private final LongAdder timeoutWaits = new LongAdder();
private final LongAdder brokenBarrierWaits = new LongAdder();
private final LongSummaryStatistics waitTimeStats = new LongSummaryStatistics();
public void recordWait(long waitTime, boolean success, boolean timeout, boolean broken) {
totalWaits.increment();
if (success) successfulWaits.increment();
if (timeout) timeoutWaits.increment();
if (broken) brokenBarrierWaits.increment();
waitTimeStats.accept(waitTime);
}
public void printMetrics() {
System.out.println("\n=== Barrier Metrics ===");
System.out.println("Total waits: " + totalWaits.sum());
System.out.println("Successful: " + successfulWaits.sum());
System.out.println("Timeouts: " + timeoutWaits.sum());
System.out.println("Broken barriers: " + brokenBarrierWaits.sum());
System.out.println("Wait time - Avg: " + waitTimeStats.getAverage() + "ms, " +
"Min: " + waitTimeStats.getMin() + "ms, " +
"Max: " + waitTimeStats.getMax() + "ms");
}
}
public static class MonitoredBarrier {
private final CyclicBarrier barrier;
private final BarrierMetrics metrics;
private final int parties;
public MonitoredBarrier(int parties, Runnable barrierAction, BarrierMetrics metrics) {
this.parties = parties;
this.metrics = metrics;
this.barrier = new CyclicBarrier(parties, barrierAction);
}
public int awaitWithMetrics(long timeout, TimeUnit unit) 
throws InterruptedException, TimeoutException, BrokenBarrierException {
long startTime = System.currentTimeMillis();
boolean success = false;
boolean timeoutOccurred = false;
boolean brokenOccurred = false;
try {
int result = barrier.await(timeout, unit);
success = true;
return result;
} catch (TimeoutException e) {
timeoutOccurred = true;
throw e;
} catch (BrokenBarrierException e) {
brokenOccurred = true;
throw e;
} finally {
long waitTime = System.currentTimeMillis() - startTime;
metrics.recordWait(waitTime, success, timeoutOccurred, brokenOccurred);
}
}
public boolean isBroken() {
return barrier.isBroken();
}
public void reset() {
barrier.reset();
}
public int getNumberWaiting() {
return barrier.getNumberWaiting();
}
}
public static void main(String[] args) throws Exception {
BarrierMetrics metrics = new BarrierMetrics();
int parties = 4;
MonitoredBarrier monitoredBarrier = new MonitoredBarrier(parties, 
() -> System.out.println("Barrier action executed at " + System.currentTimeMillis()),
metrics);
ExecutorService executor = Executors.newFixedThreadPool(parties);
for (int i = 0; i < 5; i++) { // Multiple barrier cycles
for (int j = 0; j < parties; j++) {
final int cycle = i;
final int workerId = j;
executor.execute(() -> {
try {
// Simulate variable work time
Thread.sleep(workerId * 100L + cycle * 50L);
System.out.printf("Cycle %d - Worker %d waiting at barrier%n", cycle, workerId);
monitoredBarrier.awaitWithMetrics(3, TimeUnit.SECONDS);
System.out.printf("Cycle %d - Worker %d passed barrier%n", cycle, workerId);
} catch (Exception e) {
System.err.printf("Cycle %d - Worker %d failed: %s%n", 
cycle, workerId, e.getMessage());
}
});
}
Thread.sleep(1000); // Space out cycles
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
metrics.printMetrics();
}
}

8. Best Practices and Patterns

Pattern 1: Pipeline Processing with CyclicBarrier

import java.util.concurrent.*;
import java.util.*;
public class PipelineProcessingPattern {
public static class PipelineStage<T> {
private final CyclicBarrier stageBarrier;
private final String stageName;
private final Processor<T> processor;
public PipelineStage(String name, int parallelism, Processor<T> processor) {
this.stageName = name;
this.processor = processor;
this.stageBarrier = new CyclicBarrier(parallelism, 
() -> System.out.println("[" + name + "] Stage completed"));
}
public List<T> process(List<T> inputs) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(stageBarrier.getParties());
List<Future<T>> futures = new ArrayList<>();
System.out.println("[" + stageName + "] Starting processing of " + inputs.size() + " items");
// Submit processing tasks
for (T input : inputs) {
Future<T> future = executor.submit(() -> {
T result = processor.process(input);
stageBarrier.await(); // Synchronize after each batch
return result;
});
futures.add(future);
}
// Collect results
List<T> results = new ArrayList<>();
for (Future<T> future : futures) {
results.add(future.get());
}
executor.shutdown();
return results;
}
}
public interface Processor<T> {
T process(T input) throws Exception;
}
public static void main(String[] args) throws Exception {
// Create pipeline stages
PipelineStage<String> validationStage = new PipelineStage<>("VALIDATION", 2, 
input -> {
Thread.sleep(100);
if (input == null || input.trim().isEmpty()) {
throw new IllegalArgumentException("Invalid input");
}
return input.toUpperCase();
});
PipelineStage<String> processingStage = new PipelineStage<>("PROCESSING", 3,
input -> {
Thread.sleep(200);
return input + "-PROCESSED";
});
PipelineStage<String> formattingStage = new PipelineStage<>("FORMATTING", 2,
input -> {
Thread.sleep(150);
return "[" + input + "]";
});
// Process data through pipeline
List<String> inputs = Arrays.asList("apple", "banana", "cherry", "date", "elderberry");
System.out.println("Starting pipeline processing...");
List<String> validated = validationStage.process(inputs);
System.out.println("After validation: " + validated);
List<String> processed = processingStage.process(validated);
System.out.println("After processing: " + processed);
List<String> formatted = formattingStage.process(processed);
System.out.println("Final results: " + formatted);
}
}

Conclusion

Key Benefits of CyclicBarrier:

  1. Synchronization: Perfect for coordinating multiple threads at specific points
  2. Reusability: Can be used multiple times unlike CountDownLatch
  3. Flexibility: Supports barrier actions and timeouts
  4. Parallel Processing: Ideal for parallel algorithms with synchronization points

Common Use Cases:

  • Parallel Algorithms: Divide and conquer problems
  • Multi-phase Processing: Data processing pipelines
  • Simulations: Synchronize simulation steps
  • Testing: Coordinate test execution
  • Batch Processing: Synchronize batch operations

Best Practices:

  1. Handle Exceptions Properly: Always handle BrokenBarrierException and InterruptedException
  2. Use Timeouts: Prevent permanent blocking with await(timeout, unit)
  3. Monitor Barrier State: Check isBroken() and use reset() when needed
  4. Choose Appropriate Party Count: Match the number of worker threads
  5. Keep Barrier Actions Light: Avoid long-running operations in barrier actions

CyclicBarrier is a powerful synchronization tool that enables complex coordination patterns in concurrent Java applications, making it essential for parallel processing and multi-stage workflows.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper