Java Concurrency Utilities: Complete Guide to Concurrent Programming

Java Concurrency Utilities (java.util.concurrent) provide robust, high-performance threading utilities that go beyond basic synchronized blocks and Thread classes.


1. Executor Framework

Thread Pools and Executors

import java.util.concurrent.*;
public class ExecutorFrameworkExample {
public static void main(String[] args) {
demonstrateExecutors();
demonstrateScheduledExecutor();
demonstrateForkJoinPool();
}
public static void demonstrateExecutors() {
System.out.println("=== Executor Framework Examples ===");
// 1. Fixed Thread Pool
ExecutorService fixedPool = Executors.newFixedThreadPool(4);
// 2. Cached Thread Pool
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 3. Single Thread Executor
ExecutorService singleThread = Executors.newSingleThreadExecutor();
// 4. Work Stealing Pool (ForkJoinPool)
ExecutorService workStealingPool = Executors.newWorkStealingPool();
// Submit tasks
for (int i = 1; i <= 10; i++) {
final int taskId = i;
fixedPool.submit(() -> {
System.out.println("Task " + taskId + " executed by " + 
Thread.currentThread().getName());
try {
Thread.sleep(1000); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// Proper shutdown
fixedPool.shutdown();
try {
if (!fixedPool.awaitTermination(60, TimeUnit.SECONDS)) {
fixedPool.shutdownNow();
}
} catch (InterruptedException e) {
fixedPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
public static void demonstrateScheduledExecutor() {
System.out.println("\n=== Scheduled Executor Examples ===");
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// Schedule a task to run after 2 seconds
ScheduledFuture<?> future = scheduler.schedule(() -> {
System.out.println("Scheduled task executed at: " + System.currentTimeMillis());
}, 2, TimeUnit.SECONDS);
// Schedule a task to run repeatedly every 1 second after initial 1 second delay
scheduler.scheduleAtFixedRate(() -> {
System.out.println("Fixed rate task executed at: " + System.currentTimeMillis());
}, 1, 1, TimeUnit.SECONDS);
// Schedule with fixed delay between executions
scheduler.scheduleWithFixedDelay(() -> {
try {
Thread.sleep(500); // Simulate task execution time
System.out.println("Fixed delay task executed at: " + System.currentTimeMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 1, 2, TimeUnit.SECONDS);
// Let it run for a while then shutdown
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
scheduler.shutdown();
}
}

Custom ThreadPoolExecutor

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class CustomThreadPoolExample {
public static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicLong taskCount = new AtomicLong(0);
private final ThreadLocal<Long> startTime = new ThreadLocal<>();
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
startTime.set(System.currentTimeMillis());
System.out.println("Task starting in thread: " + t.getName());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
long duration = System.currentTimeMillis() - startTime.get();
startTime.remove();
long count = taskCount.incrementAndGet();
System.out.println("Task completed in " + duration + "ms. Total tasks: " + count);
if (t != null) {
System.err.println("Task failed with exception: " + t.getMessage());
}
}
@Override
protected void terminated() {
super.terminated();
System.out.println("ThreadPool terminated. Total tasks executed: " + taskCount.get());
}
}
public static void main(String[] args) {
CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(
2, 4, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10)
);
// Rejected execution handler
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 15; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Executing task " + taskId + " in " + 
Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}

2. Callable and Future

import java.util.concurrent.*;
import java.util.*;
public class CallableFutureExample {
public static void main(String[] args) throws Exception {
demonstrateCallableFuture();
demonstrateCompletableFuture();
}
public static void demonstrateCallableFuture() throws Exception {
System.out.println("=== Callable and Future Examples ===");
ExecutorService executor = Executors.newFixedThreadPool(3);
// Callable that returns a result
Callable<String> task = () -> {
Thread.sleep(1000);
return "Task completed at " + System.currentTimeMillis();
};
// Submit Callable and get Future
Future<String> future = executor.submit(task);
// Do other work while task is executing
System.out.println("Doing other work...");
// Get result (blocks if not ready)
try {
String result = future.get(2, TimeUnit.SECONDS);
System.out.println("Result: " + result);
} catch (TimeoutException e) {
System.err.println("Task timed out");
future.cancel(true);
}
// Multiple tasks example
List<Callable<Integer>> tasks = Arrays.asList(
() -> { Thread.sleep(1000); return 1; },
() -> { Thread.sleep(2000); return 2; },
() -> { Thread.sleep(1500); return 3; }
);
// Invoke all tasks
List<Future<Integer>> futures = executor.invokeAll(tasks);
for (Future<Integer> f : futures) {
if (!f.isCancelled()) {
System.out.println("Task result: " + f.get());
}
}
// Invoke any task
Integer firstResult = executor.invokeAny(tasks);
System.out.println("First completed task result: " + firstResult);
executor.shutdown();
}
public static void demonstrateCompletableFuture() {
System.out.println("\n=== CompletableFuture Examples ===");
// Simple CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello";
});
// Transform result
CompletableFuture<String> greeting = future.thenApply(s -> s + " World!");
// Consume result
greeting.thenAccept(System.out::println);
// Combine multiple futures
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2 + "!")
.thenAccept(System.out::println);
// Exception handling
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Simulated error");
}
return "Success";
}).exceptionally(ex -> {
System.err.println("Error: " + ex.getMessage());
return "Fallback";
}).thenAccept(System.out::println);
// Wait for completion
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

3. Concurrent Collections

import java.util.concurrent.*;
import java.util.*;
import java.util.concurrent.atomic.*;
public class ConcurrentCollectionsExample {
public static void main(String[] args) throws Exception {
demonstrateConcurrentHashMap();
demonstrateBlockingQueues();
demonstrateCopyOnWrite();
demonstrateSynchronizedCollections();
}
public static void demonstrateConcurrentHashMap() {
System.out.println("=== ConcurrentHashMap Examples ===");
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// Atomic operations
map.put("key1", 1);
map.compute("key1", (k, v) -> v == null ? 1 : v + 1);
map.merge("key2", 1, Integer::sum);
// Thread-safe iteration
map.put("A", 1);
map.put("B", 2);
map.put("C", 3);
map.forEach(2, (k, v) -> 
System.out.println(Thread.currentThread().getName() + " - " + k + ": " + v));
// Search with parallelism threshold
String result = map.search(2, (k, v) -> v > 2 ? k : null);
System.out.println("Found key with value > 2: " + result);
// Reduce operation
Integer sum = map.reduceValues(2, Integer::sum);
System.out.println("Sum of values: " + sum);
}
public static void demonstrateBlockingQueues() throws InterruptedException {
System.out.println("\n=== BlockingQueue Examples ===");
// ArrayBlockingQueue - bounded
BlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(10);
// LinkedBlockingQueue - optionally bounded
BlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>(20);
// PriorityBlockingQueue - unbounded, ordered
BlockingQueue<Integer> priorityQueue = new PriorityBlockingQueue<>();
// SynchronousQueue - direct handoff
BlockingQueue<String> syncQueue = new SynchronousQueue<>();
// Producer-Consumer example
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
queue.put(i); // Blocks if queue is full
System.out.println("Produced: " + i);
Thread.sleep(100);
}
queue.put(-1); // Poison pill
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
while (true) {
Integer item = queue.take(); // Blocks if queue is empty
if (item == -1) break;
System.out.println("Consumed: " + item);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
}
public static void demonstrateCopyOnWrite() {
System.out.println("\n=== CopyOnWrite Collections ===");
// CopyOnWriteArrayList - thread-safe List for read-heavy workloads
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("A");
list.add("B");
list.add("C");
// Iteration is safe even during modification
Thread writer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
list.add("Element-" + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
Thread reader = new Thread(() -> {
for (String element : list) {
System.out.println("Read: " + element);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
writer.start();
reader.start();
try {
writer.join();
reader.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// CopyOnWriteArraySet
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
set.add("Apple");
set.add("Banana");
set.addAll(list);
System.out.println("Set contents: " + set);
}
public static void demonstrateSynchronizedCollections() {
System.out.println("\n=== Synchronized Collections ===");
// Creating synchronized collections
List<String> syncList = Collections.synchronizedList(new ArrayList<>());
Map<String, Integer> syncMap = Collections.synchronizedMap(new HashMap<>());
Set<String> syncSet = Collections.synchronizedSet(new HashSet<>());
// Safe to use from multiple threads
syncList.add("Item1");
syncList.add("Item2");
// Manual synchronization required for iteration
synchronized(syncList) {
for (String item : syncList) {
System.out.println("List item: " + item);
}
}
// Concurrent collections are generally preferred
}
}

4. Locks and Synchronizers

import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
public class LocksAndSynchronizers {
public static void main(String[] args) throws Exception {
demonstrateReentrantLock();
demonstrateReadWriteLock();
demonstrateStampedLock();
demonstrateCountDownLatch();
demonstrateCyclicBarrier();
demonstrateSemaphore();
demonstrateExchanger();
demonstratePhaser();
}
public static void demonstrateReentrantLock() {
System.out.println("=== ReentrantLock Examples ===");
ReentrantLock lock = new ReentrantLock();
AtomicInteger counter = new AtomicInteger(0);
Runnable task = () -> {
lock.lock();
try {
int current = counter.incrementAndGet();
System.out.println(Thread.currentThread().getName() + 
" acquired lock, counter: " + current);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
};
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executor.execute(task);
}
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Final counter value: " + counter.get());
System.out.println("Lock was held by: " + lock.getHoldCount() + " threads");
System.out.println("Queue length: " + lock.getQueueLength());
}
public static void demonstrateReadWriteLock() {
System.out.println("\n=== ReadWriteLock Examples ===");
ReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock readLock = rwLock.readLock();
Lock writeLock = rwLock.writeLock();
List<String> data = new ArrayList<>();
data.add("Initial Data");
// Multiple readers can read concurrently
Runnable reader = () -> {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " reading: " + data);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
readLock.unlock();
}
};
// Only one writer at a time
Runnable writer = () -> {
writeLock.lock();
try {
String newData = "Data from " + Thread.currentThread().getName();
data.add(newData);
System.out.println(Thread.currentThread().getName() + " wrote: " + newData);
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
writeLock.unlock();
}
};
ExecutorService executor = Executors.newFixedThreadPool(5);
// Start 3 readers and 2 writers
for (int i = 0; i < 3; i++) {
executor.execute(reader);
}
for (int i = 0; i < 2; i++) {
executor.execute(writer);
}
for (int i = 0; i < 2; i++) {
executor.execute(reader);
}
executor.shutdown();
}
public static void demonstrateStampedLock() {
System.out.println("\n=== StampedLock Examples ===");
StampedLock lock = new StampedLock();
String sharedData = "Initial Value";
// Optimistic reading
Runnable optimisticReader = () -> {
long stamp = lock.tryOptimisticRead();
String dataSnapshot = sharedData;
if (!lock.validate(stamp)) {
// Fallback to pessimistic read
stamp = lock.readLock();
try {
dataSnapshot = sharedData;
} finally {
lock.unlockRead(stamp);
}
}
System.out.println(Thread.currentThread().getName() + " read: " + dataSnapshot);
};
// Write operation
Runnable writer = () -> {
long stamp = lock.writeLock();
try {
sharedData = "Modified by " + Thread.currentThread().getName();
System.out.println(Thread.currentThread().getName() + " wrote: " + sharedData);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlockWrite(stamp);
}
};
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.execute(optimisticReader);
executor.execute(writer);
executor.execute(optimisticReader);
executor.shutdown();
}
public static void demonstrateCountDownLatch() throws InterruptedException {
System.out.println("\n=== CountDownLatch Examples ===");
int workerCount = 3;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(workerCount);
for (int i = 0; i < workerCount; i++) {
final int workerId = i;
new Thread(() -> {
try {
System.out.println("Worker " + workerId + " waiting for start signal");
startSignal.await(); // Wait for start signal
System.out.println("Worker " + workerId + " working...");
Thread.sleep(1000 + workerId * 100); // Simulate work
System.out.println("Worker " + workerId + " completed");
doneSignal.countDown(); // Signal completion
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
Thread.sleep(1000); // Let workers get ready
System.out.println("Starting all workers...");
startSignal.countDown(); // Start all workers
doneSignal.await(); // Wait for all workers to complete
System.out.println("All workers completed!");
}
public static void demonstrateCyclicBarrier() {
System.out.println("\n=== CyclicBarrier Examples ===");
int partySize = 3;
CyclicBarrier barrier = new CyclicBarrier(partySize, () -> {
System.out.println("All parties reached barrier! Proceeding...");
});
for (int i = 0; i < partySize; i++) {
final int partyId = i;
new Thread(() -> {
try {
System.out.println("Party " + partyId + " starting journey");
Thread.sleep(1000 + partyId * 500); // Different arrival times
System.out.println("Party " + partyId + " reached barrier");
barrier.await(); // Wait for other parties
System.out.println("Party " + partyId + " continuing after barrier");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
public static void demonstrateSemaphore() {
System.out.println("\n=== Semaphore Examples ===");
Semaphore semaphore = new Semaphore(3); // 3 permits available
AtomicInteger counter = new AtomicInteger(0);
Runnable limitedTask = () -> {
try {
System.out.println(Thread.currentThread().getName() + " acquiring permit");
semaphore.acquire();
int current = counter.incrementAndGet();
System.out.println(Thread.currentThread().getName() + 
" got permit. Active: " + current);
Thread.sleep(2000); // Simulate work
counter.decrementAndGet();
semaphore.release();
System.out.println(Thread.currentThread().getName() + " released permit");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executor.execute(limitedTask);
}
executor.shutdown();
}
public static void demonstrateExchanger() throws InterruptedException {
System.out.println("\n=== Exchanger Examples ===");
Exchanger<String> exchanger = new Exchanger<>();
Thread producer = new Thread(() -> {
try {
String data = "Data from Producer";
System.out.println("Producer sending: " + data);
String response = exchanger.exchange(data);
System.out.println("Producer received: " + response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
String data = "Data from Consumer";
System.out.println("Consumer sending: " + data);
String response = exchanger.exchange(data);
System.out.println("Consumer received: " + response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
}
public static void demonstratePhaser() {
System.out.println("\n=== Phaser Examples ===");
Phaser phaser = new Phaser(1); // Register main thread
for (int i = 0; i < 3; i++) {
final int taskId = i;
phaser.register(); // Register new party
new Thread(() -> {
System.out.println("Task " + taskId + " starting phase " + phaser.getPhase());
phaser.arriveAndAwaitAdvance(); // Wait for all tasks
System.out.println("Task " + taskId + " completed phase " + (phaser.getPhase() - 1));
phaser.arriveAndDeregister(); // Task completed
}).start();
}
// Main thread controls phaser
phaser.arriveAndAwaitAdvance();
System.out.println("All tasks completed first phase");
phaser.arriveAndDeregister();
}
}

5. Atomic Variables and Non-blocking Algorithms

import java.util.concurrent.atomic.*;
import java.util.function.*;
public class AtomicExamples {
public static void main(String[] args) {
demonstrateAtomicVariables();
demonstrateAtomicArrays();
demonstrateAtomicFieldUpdaters();
demonstrateAccumulators();
}
public static void demonstrateAtomicVariables() {
System.out.println("=== Atomic Variables Examples ===");
AtomicInteger atomicInt = new AtomicInteger(0);
AtomicLong atomicLong = new AtomicLong(0L);
AtomicBoolean atomicBool = new AtomicBoolean(false);
AtomicReference<String> atomicRef = new AtomicReference<>("initial");
// Basic operations
atomicInt.set(42);
System.out.println("AtomicInt value: " + atomicInt.get());
// Compare-and-set (CAS) operation
boolean success = atomicInt.compareAndSet(42, 100);
System.out.println("CAS successful: " + success + ", new value: " + atomicInt.get());
// Atomic updates
System.out.println("Increment and get: " + atomicInt.incrementAndGet());
System.out.println("Add and get: " + atomicInt.addAndGet(50));
// Functional updates
atomicInt.updateAndGet(x -> x * 2);
System.out.println("After updateAndGet: " + atomicInt.get());
// Atomic reference example
atomicRef.set("Hello");
atomicRef.accumulateAndGet(" World", (a, b) -> a + b);
System.out.println("AtomicReference: " + atomicRef.get());
}
public static void demonstrateAtomicArrays() {
System.out.println("\n=== Atomic Arrays Examples ===");
AtomicIntegerArray atomicIntArray = new AtomicIntegerArray(5);
AtomicLongArray atomicLongArray = new AtomicLongArray(3);
AtomicReferenceArray<String> atomicRefArray = new AtomicReferenceArray<>(4);
// Initialize arrays
for (int i = 0; i < atomicIntArray.length(); i++) {
atomicIntArray.set(i, i * 10);
}
// Atomic operations on array elements
atomicIntArray.getAndAdd(2, 5); // Add 5 to element at index 2
atomicIntArray.compareAndSet(1, 10, 100); // CAS operation
// Print array contents
System.out.print("AtomicIntArray: ");
for (int i = 0; i < atomicIntArray.length(); i++) {
System.out.print(atomicIntArray.get(i) + " ");
}
System.out.println();
// Reference array
atomicRefArray.set(0, "First");
atomicRefArray.set(1, "Second");
atomicRefArray.compareAndSet(1, "Second", "Updated");
System.out.print("AtomicReferenceArray: ");
for (int i = 0; i < atomicRefArray.length(); i++) {
System.out.print(atomicRefArray.get(i) + " ");
}
System.out.println();
}
public static void demonstrateAtomicFieldUpdaters() {
System.out.println("\n=== Atomic Field Updaters Examples ===");
class Counter {
private volatile int count;
private volatile long total;
public int getCount() { return count; }
public long getTotal() { return total; }
}
Counter counter = new Counter();
// Create field updaters
AtomicIntegerFieldUpdater<Counter> countUpdater =
AtomicIntegerFieldUpdater.newUpdater(Counter.class, "count");
AtomicLongFieldUpdater<Counter> totalUpdater =
AtomicLongFieldUpdater.newUpdater(Counter.class, "total");
// Use updaters instead of synchronized blocks
countUpdater.incrementAndGet(counter);
totalUpdater.addAndGet(counter, 100L);
System.out.println("Count: " + counter.getCount() + ", Total: " + counter.getTotal());
// CAS operation with field updater
boolean updated = countUpdater.compareAndSet(counter, 1, 42);
System.out.println("CAS successful: " + updated + ", new count: " + counter.getCount());
}
public static void demonstrateAccumulators() {
System.out.println("\n=== Accumulators and Adders Examples ===");
// LongAdder - better performance under high contention
LongAdder longAdder = new LongAdder();
LongAccumulator longAccumulator = new LongAccumulator(Long::max, 0L);
// DoubleAdder and DoubleAccumulator also available
// Multiple threads updating adders
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executor.execute(() -> {
longAdder.increment();
longAccumulator.accumulate(Thread.currentThread().getId());
});
}
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("LongAdder sum: " + longAdder.sum());
System.out.println("LongAccumulator max: " + longAccumulator.get());
// Reset accumulators
longAdder.reset();
longAccumulator.reset();
System.out.println("After reset - LongAdder: " + longAdder.sum() + 
", LongAccumulator: " + longAccumulator.get());
}
}

6. Practical Examples and Patterns

Producer-Consumer with ThreadPool

import java.util.concurrent.*;
import java.util.*;
public class ProducerConsumerPattern {
public static class DataProcessor {
private final ExecutorService producerExecutor;
private final ExecutorService consumerExecutor;
private final BlockingQueue<DataItem> queue;
private final CountDownLatch completionLatch;
private final AtomicInteger processedCount = new AtomicInteger(0);
public DataProcessor(int producerCount, int consumerCount, int queueCapacity) {
this.producerExecutor = Executors.newFixedThreadPool(producerCount);
this.consumerExecutor = Executors.newFixedThreadPool(consumerCount);
this.queue = new ArrayBlockingQueue<>(queueCapacity);
this.completionLatch = new CountDownLatch(producerCount);
}
public void startProcessing(int itemsPerProducer) {
// Start producers
for (int i = 0; i < completionLatch.getCount(); i++) {
final int producerId = i;
producerExecutor.execute(() -> produceData(producerId, itemsPerProducer));
}
// Start consumers
for (int i = 0; i < 3; i++) {
final int consumerId = i;
consumerExecutor.execute(() -> consumeData(consumerId));
}
}
private void produceData(int producerId, int itemCount) {
try {
Random random = new Random();
for (int i = 0; i < itemCount; i++) {
DataItem item = new DataItem(producerId, i, 
"Data-" + producerId + "-" + i, random.nextInt(1000));
queue.put(item); // Blocks if queue is full
System.out.println("Producer " + producerId + " produced: " + item);
Thread.sleep(random.nextInt(100)); // Simulate production time
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
completionLatch.countDown();
System.out.println("Producer " + producerId + " finished");
}
}
private void consumeData(int consumerId) {
try {
while (true) {
// Stop when all producers are done and queue is empty
if (completionLatch.getCount() == 0 && queue.isEmpty()) {
break;
}
// Poll with timeout to avoid permanent blocking
DataItem item = queue.poll(100, TimeUnit.MILLISECONDS);
if (item != null) {
processItem(consumerId, item);
processedCount.incrementAndGet();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Consumer " + consumerId + " finished");
}
private void processItem(int consumerId, DataItem item) throws InterruptedException {
System.out.println("Consumer " + consumerId + " processing: " + item);
Thread.sleep(item.getProcessingTime()); // Simulate processing
System.out.println("Consumer " + consumerId + " completed: " + item);
}
public void awaitCompletion() throws InterruptedException {
producerExecutor.shutdown();
consumerExecutor.shutdown();
if (!producerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
producerExecutor.shutdownNow();
}
if (!consumerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
consumerExecutor.shutdownNow();
}
System.out.println("Processing completed. Total items processed: " + processedCount.get());
}
}
public static class DataItem {
private final int producerId;
private final int itemId;
private final String data;
private final int processingTime;
public DataItem(int producerId, int itemId, String data, int processingTime) {
this.producerId = producerId;
this.itemId = itemId;
this.data = data;
this.processingTime = processingTime;
}
public int getProcessingTime() { return processingTime; }
@Override
public String toString() {
return String.format("DataItem[producer=%d, id=%d, data=%s, time=%dms]", 
producerId, itemId, data, processingTime);
}
}
public static void main(String[] args) throws Exception {
DataProcessor processor = new DataProcessor(3, 2, 10);
processor.startProcessing(5);
processor.awaitCompletion();
}
}

Rate Limiter with Semaphore

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class RateLimiterExample {
public static class SimpleRateLimiter {
private final Semaphore semaphore;
private final int maxPermits;
private final AtomicInteger currentPermits;
public SimpleRateLimiter(int maxPermits) {
this.maxPermits = maxPermits;
this.semaphore = new Semaphore(maxPermits);
this.currentPermits = new AtomicInteger(maxPermits);
startPermitRefill();
}
private void startPermitRefill() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
int current = currentPermits.get();
if (current < maxPermits) {
int newPermits = Math.min(maxPermits, current + 1);
if (currentPermits.compareAndSet(current, newPermits)) {
semaphore.release(1);
System.out.println("Refilled permit. Available: " + newPermits);
}
}
}, 1, 1, TimeUnit.SECONDS);
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public void acquire() throws InterruptedException {
semaphore.acquire();
currentPermits.decrementAndGet();
}
public int getAvailablePermits() {
return semaphore.availablePermits();
}
}
public static void main(String[] args) throws Exception {
SimpleRateLimiter rateLimiter = new SimpleRateLimiter(3);
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 20; i++) {
final int taskId = i;
executor.execute(() -> {
try {
if (rateLimiter.tryAcquire()) {
System.out.println("Task " + taskId + " acquired permit");
Thread.sleep(2000); // Simulate work
System.out.println("Task " + taskId + " completed");
} else {
System.out.println("Task " + taskId + " rejected - rate limit exceeded");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread.sleep(200); // Space out task submission
}
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
}

Conclusion

Key Concurrency Utilities Summary:

UtilityPurposeUse Case
ExecutorServiceThread pool managementBackground task execution
Future/CallableAsynchronous computationParallel processing with results
CompletableFutureReactive programmingChaining async operations
ConcurrentHashMapThread-safe mapsShared data storage
BlockingQueueProducer-ConsumerTask queues, buffering
ReentrantLockAdvanced lockingFine-grained synchronization
ReadWriteLockMultiple readersRead-heavy, write-occasional
CountDownLatchOne-time synchronizationWait for initialization
CyclicBarrierReusable synchronizationParallel phase coordination
SemaphoreResource limitingRate limiting, pool access
Atomic VariablesLock-free programmingCounters, flags, state

Best Practices:

  1. Prefer high-level utilities over low-level synchronized blocks
  2. Use thread pools instead of creating threads directly
  3. Choose the right concurrent collection for your access pattern
  4. Understand the trade-offs between different lock implementations
  5. Always handle InterruptedException properly
  6. Use atomic variables for simple state management
  7. Consider CompletableFuture for complex async workflows
  8. Test thoroughly - concurrency bugs are hard to reproduce

Java Concurrency Utilities provide robust, tested building blocks for creating high-performance, thread-safe applications while avoiding many common concurrency pitfalls.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper