Concurrent Collections in Java

Concurrent collections in Java are thread-safe implementations of collection interfaces designed for multi-threaded environments. They provide better performance than synchronized wrappers by using advanced synchronization techniques.

1. ConcurrentMap Implementations

ConcurrentHashMap

import java.util.concurrent.*;
import java.util.*;
public class ConcurrentHashMapExample {
public static void main(String[] args) {
// Creating ConcurrentHashMap
ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();
// Basic operations - thread-safe
concurrentMap.put("John", 25);
concurrentMap.put("Alice", 30);
concurrentMap.putIfAbsent("John", 40); // Won't replace existing
concurrentMap.putIfAbsent("Bob", 35);  // Will be added
System.out.println("Map: " + concurrentMap);
// Atomic operations
concurrentMap.compute("John", (k, v) -> v + 1); // John becomes 26
concurrentMap.merge("Alice", 1, Integer::sum); // Alice becomes 31
System.out.println("After atomic operations: " + concurrentMap);
// Search operations
String result = concurrentMap.search(1, (k, v) -> v > 30 ? k : null);
System.out.println("First person older than 30: " + result);
// ForEach operations
concurrentMap.forEach(1, (k, v) -> System.out.println(k + ": " + v));
// Reduce operations
int totalAge = concurrentMap.reduceValues(1, Integer::sum);
System.out.println("Total age: " + totalAge);
// Working with ConcurrentHashMap in multi-threaded environment
multiThreadedExample();
}
public static void multiThreadedExample() {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newFixedThreadPool(3);
// Multiple threads writing concurrently
for (int i = 0; i < 10; i++) {
final int threadId = i;
executor.submit(() -> {
for (int j = 0; j < 100; j++) {
String key = "key-" + threadId + "-" + j;
map.put(key, threadId * 100 + j);
}
});
}
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Final map size: " + map.size());
}
}

ConcurrentHashMap Advanced Features

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class ConcurrentHashMapAdvanced {
public static void main(String[] args) {
ConcurrentHashMap<String, AtomicInteger> wordCount = new ConcurrentHashMap<>();
// Thread-safe accumulation
List<String> words = Arrays.asList("apple", "banana", "apple", "orange", "banana", "apple");
words.parallelStream().forEach(word -> {
wordCount.computeIfAbsent(word, k -> new AtomicInteger(0)).incrementAndGet();
});
System.out.println("Word counts: " + wordCount);
// Bulk operations
ConcurrentHashMap<String, String> config = new ConcurrentHashMap<>();
config.put("host", "localhost");
config.put("port", "8080");
config.put("timeout", "30");
// Replace all values
config.replaceAll((k, v) -> k.toUpperCase() + "_" + v);
System.out.println("Transformed config: " + config);
// Compute if absent/present
ConcurrentHashMap<String, List<String>> userPreferences = new ConcurrentHashMap<>();
List<String> preferences = userPreferences.computeIfAbsent("user1", 
k -> new CopyOnWriteArrayList<>());
preferences.add("theme:dark");
userPreferences.computeIfPresent("user1", (k, v) -> {
v.add("lang:en");
return v;
});
System.out.println("User preferences: " + userPreferences);
}
}

2. Concurrent Queues

Blocking Queues

import java.util.concurrent.*;
import java.util.*;
public class BlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
// ArrayBlockingQueue - bounded queue
BlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(3);
// LinkedBlockingQueue - optionally bounded
BlockingQueue<Integer> linkedQueue = new LinkedBlockingQueue<>(5);
// PriorityBlockingQueue - unbounded priority queue
BlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>();
// SynchronousQueue - direct handoff
BlockingQueue<String> syncQueue = new SynchronousQueue<>();
demonstrateArrayBlockingQueue();
demonstrateProducerConsumer();
}
public static void demonstrateArrayBlockingQueue() throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
// Add elements
queue.put("Item1");
queue.put("Item2");
queue.put("Item3");
System.out.println("Queue: " + queue);
// Try to add more - will block until space available
new Thread(() -> {
try {
queue.put("Item4"); // This will block until space is available
System.out.println("Item4 added");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
Thread.sleep(1000);
// Remove an item to make space
String item = queue.take();
System.out.println("Removed: " + item);
}
public static void demonstrateProducerConsumer() {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// Producer
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
queue.put(i);
System.out.println("Produced: " + i);
Thread.sleep(100);
}
queue.put(-1); // Poison pill
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer
Thread consumer = new Thread(() -> {
try {
while (true) {
Integer item = queue.take();
if (item == -1) break; // Poison pill
System.out.println("Consumed: " + item);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
class Task implements Comparable<Task> {
private String name;
private int priority;
public Task(String name, int priority) {
this.name = name;
this.priority = priority;
}
@Override
public int compareTo(Task other) {
return Integer.compare(other.priority, this.priority); // Higher priority first
}
@Override
public String toString() {
return name + "(" + priority + ")";
}
}

Non-Blocking Queues

import java.util.concurrent.*;
public class NonBlockingQueueExample {
public static void main(String[] args) {
// ConcurrentLinkedQueue - non-blocking FIFO
ConcurrentLinkedQueue<String> nonBlockingQueue = new ConcurrentLinkedQueue<>();
// ConcurrentLinkedDeque - non-blocking double-ended queue
ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<>();
demonstrateConcurrentLinkedQueue();
}
public static void demonstrateConcurrentLinkedQueue() {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
// Multiple producers
Thread producer1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
queue.offer("Producer1-" + i);
}
});
Thread producer2 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
queue.offer("Producer2-" + i);
}
});
// Single consumer
Thread consumer = new Thread(() -> {
int count = 0;
while (count < 10) {
String item = queue.poll();
if (item != null) {
System.out.println("Consumed: " + item);
count++;
}
}
});
producer1.start();
producer2.start();
consumer.start();
try {
producer1.join();
producer2.join();
consumer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Final queue size: " + queue.size());
}
}

3. Concurrent Lists and Sets

CopyOnWrite Collections

import java.util.concurrent.*;
import java.util.*;
public class CopyOnWriteExample {
public static void main(String[] args) {
// CopyOnWriteArrayList - thread-safe list
CopyOnWriteArrayList<String> copyOnWriteList = new CopyOnWriteArrayList<>();
// CopyOnWriteArraySet - thread-safe set
CopyOnWriteArraySet<String> copyOnWriteSet = new CopyOnWriteArraySet<>();
demonstrateCopyOnWriteArrayList();
demonstrateCopyOnWriteIteration();
}
public static void demonstrateCopyOnWriteArrayList() {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>(
Arrays.asList("A", "B", "C")
);
// Multiple threads reading and writing
Thread writer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
String newItem = "New-" + i;
list.add(newItem);
System.out.println("Added: " + newItem);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
Thread reader = new Thread(() -> {
for (int i = 0; i < 3; i++) {
System.out.println("Current list: " + list);
try {
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
writer.start();
reader.start();
try {
writer.join();
reader.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void demonstrateCopyOnWriteIteration() {
CopyOnWriteArrayList<Integer> numbers = new CopyOnWriteArrayList<>(
Arrays.asList(1, 2, 3, 4, 5)
);
// Iterator uses snapshot of the list when created
Iterator<Integer> iterator = numbers.iterator();
// Modify list while iterating
numbers.add(6);
numbers.remove(Integer.valueOf(3));
// Iterator still sees original snapshot
System.out.println("Iterator sees:");
while (iterator.hasNext()) {
System.out.println(iterator.next()); // Won't see changes
}
System.out.println("Actual list: " + numbers);
}
}

ConcurrentSkipList Collections

import java.util.concurrent.*;
import java.util.*;
public class ConcurrentSkipListExample {
public static void main(String[] args) {
// ConcurrentSkipListSet - concurrent sorted set
ConcurrentSkipListSet<Integer> skipListSet = new ConcurrentSkipListSet<>();
// ConcurrentSkipListMap - concurrent sorted map
ConcurrentSkipListMap<String, Integer> skipListMap = new ConcurrentSkipListMap<>();
demonstrateConcurrentSkipListSet();
demonstrateConcurrentSkipListMap();
}
public static void demonstrateConcurrentSkipListSet() {
ConcurrentSkipListSet<Integer> set = new ConcurrentSkipListSet<>();
// Add elements from multiple threads
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int start = i * 10;
executor.submit(() -> {
for (int j = 0; j < 10; j++) {
set.add(start + j);
}
});
}
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Sorted set: " + set);
System.out.println("First: " + set.first());
System.out.println("Last: " + set.last());
System.out.println("Subset [20-40]: " + set.subSet(20, 40));
}
public static void demonstrateConcurrentSkipListMap() {
ConcurrentSkipListMap<String, Employee> employeeMap = new ConcurrentSkipListMap<>();
employeeMap.put("John", new Employee("John", 50000));
employeeMap.put("Alice", new Employee("Alice", 60000));
employeeMap.put("Bob", new Employee("Bob", 55000));
employeeMap.put("Carol", new Employee("Carol", 70000));
System.out.println("Employee map: " + employeeMap);
// Navigable map operations
System.out.println("First entry: " + employeeMap.firstEntry());
System.out.println("Last entry: " + employeeMap.lastEntry());
System.out.println("Ceiling key for 'C': " + employeeMap.ceilingKey("C"));
System.out.println("Floor key for 'C': " + employeeMap.floorKey("C"));
// Head and tail views
System.out.println("Head map (before 'C'): " + employeeMap.headMap("C"));
System.out.println("Tail map (after 'C'): " + employeeMap.tailMap("C"));
}
}
class Employee {
private String name;
private int salary;
public Employee(String name, int salary) {
this.name = name;
this.salary = salary;
}
@Override
public String toString() {
return name + "($" + salary + ")";
}
}

4. Concurrent Utility Classes

CountDownLatch

import java.util.concurrent.*;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int workerCount = 3;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(workerCount);
for (int i = 0; i < workerCount; i++) {
new Thread(new Worker(startSignal, doneSignal, "Worker-" + i)).start();
}
System.out.println("Main thread preparing resources...");
Thread.sleep(2000);
System.out.println("Starting all workers...");
startSignal.countDown(); // Release all workers
System.out.println("Main thread waiting for workers to complete...");
doneSignal.await(); // Wait for all workers to complete
System.out.println("All workers completed. Main thread continuing.");
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
private final String name;
public Worker(CountDownLatch startSignal, CountDownLatch doneSignal, String name) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
this.name = name;
}
@Override
public void run() {
try {
System.out.println(name + " waiting to start...");
startSignal.await(); // Wait for start signal
System.out.println(name + " working...");
Thread.sleep(1000 + (int)(Math.random() * 2000));
System.out.println(name + " completed work.");
doneSignal.countDown(); // Signal completion
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

CyclicBarrier

import java.util.concurrent.*;
public class CyclicBarrierExample {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, 
() -> System.out.println("All threads reached barrier! Continuing..."));
for (int i = 0; i < threadCount; i++) {
new Thread(new Task(barrier, "Task-" + i)).start();
}
}
}
class Task implements Runnable {
private final CyclicBarrier barrier;
private final String name;
public Task(CyclicBarrier barrier, String name) {
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
System.out.println(name + " starting phase 1");
Thread.sleep(1000 + (int)(Math.random() * 2000));
System.out.println(name + " finished phase 1, waiting at barrier");
barrier.await();
System.out.println(name + " starting phase 2");
Thread.sleep(1000 + (int)(Math.random() * 2000));
System.out.println(name + " finished phase 2, waiting at barrier");
barrier.await();
System.out.println(name + " completed all phases");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}
}

Exchanger

import java.util.concurrent.*;
public class ExchangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(new Producer(exchanger)).start();
new Thread(new Consumer(exchanger)).start();
}
}
class Producer implements Runnable {
private final Exchanger<String> exchanger;
private static int counter = 0;
public Producer(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
for (int i = 0; i < 3; i++) {
String data = "Data-" + (++counter);
System.out.println("Producer sending: " + data);
// Exchange data with consumer and get response
String response = exchanger.exchange(data);
System.out.println("Producer received: " + response);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final Exchanger<String> exchanger;
public Consumer(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
for (int i = 0; i < 3; i++) {
// Receive data from producer
String data = exchanger.exchange(null);
System.out.println("Consumer received: " + data);
// Process and send response
String response = "Ack-" + data;
System.out.println("Consumer sending: " + response);
// Exchange response with producer
exchanger.exchange(response);
Thread.sleep(1500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

5. Performance Comparison

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class PerformanceComparison {
private static final int THREAD_COUNT = 10;
private static final int OPERATIONS_PER_THREAD = 10000;
public static void main(String[] args) throws InterruptedException {
System.out.println("Performance Comparison:");
// Test HashMap vs ConcurrentHashMap
testMapPerformance();
// Test ArrayList vs CopyOnWriteArrayList
testListPerformance();
// Test Queue implementations
testQueuePerformance();
}
public static void testMapPerformance() throws InterruptedException {
// HashMap with synchronization
Map<Integer, Integer> syncMap = Collections.synchronizedMap(new HashMap<>());
// ConcurrentHashMap
ConcurrentHashMap<Integer, Integer> concurrentMap = new ConcurrentHashMap<>();
long syncTime = testMap(syncMap, "Synchronized HashMap");
long concurrentTime = testMap(concurrentMap, "ConcurrentHashMap");
System.out.printf("ConcurrentHashMap is %.2fx faster%n%n", 
(double) syncTime / concurrentTime);
}
public static long testMap(Map<Integer, Integer> map, String name) throws InterruptedException {
long startTime = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadId = i;
new Thread(() -> {
for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
int key = threadId * OPERATIONS_PER_THREAD + j;
map.put(key, key);
map.get(key);
}
latch.countDown();
}).start();
}
latch.await();
long endTime = System.currentTimeMillis();
System.out.printf("%s: %d ms%n", name, (endTime - startTime));
return endTime - startTime;
}
public static void testListPerformance() throws InterruptedException {
// ArrayList with synchronization
List<Integer> syncList = Collections.synchronizedList(new ArrayList<>());
// CopyOnWriteArrayList
CopyOnWriteArrayList<Integer> copyOnWriteList = new CopyOnWriteArrayList<>();
long syncTime = testList(syncList, "Synchronized ArrayList");
long copyOnWriteTime = testList(copyOnWriteList, "CopyOnWriteArrayList");
System.out.printf("For read-heavy: CopyOnWriteArrayList is better%n");
System.out.printf("For write-heavy: Synchronized ArrayList is %.2fx faster%n%n", 
(double) copyOnWriteTime / syncTime);
}
public static long testList(List<Integer> list, String name) throws InterruptedException {
long startTime = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadId = i;
new Thread(() -> {
// 80% reads, 20% writes
for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
if (j % 5 == 0) {
// Write operation
list.add(threadId * OPERATIONS_PER_THREAD + j);
} else {
// Read operation
if (!list.isEmpty()) {
list.get(list.size() - 1);
}
}
}
latch.countDown();
}).start();
}
latch.await();
long endTime = System.currentTimeMillis();
System.out.printf("%s: %d ms%n", name, (endTime - startTime));
return endTime - startTime;
}
public static void testQueuePerformance() throws InterruptedException {
BlockingQueue<Integer> arrayQueue = new ArrayBlockingQueue<>(1000);
BlockingQueue<Integer> linkedQueue = new LinkedBlockingQueue<>(1000);
ConcurrentLinkedQueue<Integer> concurrentQueue = new ConcurrentLinkedQueue<>();
long arrayTime = testQueue(arrayQueue, "ArrayBlockingQueue");
long linkedTime = testQueue(linkedQueue, "LinkedBlockingQueue");
long concurrentTime = testQueue(concurrentQueue, "ConcurrentLinkedQueue");
System.out.printf("ConcurrentLinkedQueue is %.2fx faster than ArrayBlockingQueue%n%n", 
(double) arrayTime / concurrentTime);
}
public static long testQueue(Queue<Integer> queue, String name) throws InterruptedException {
long startTime = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(THREAD_COUNT * 2);
// Producers
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadId = i;
new Thread(() -> {
for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
if (queue instanceof BlockingQueue) {
try {
((BlockingQueue<Integer>) queue).put(threadId * OPERATIONS_PER_THREAD + j);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
queue.offer(threadId * OPERATIONS_PER_THREAD + j);
}
}
latch.countDown();
}).start();
}
// Consumers
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
if (queue instanceof BlockingQueue) {
try {
((BlockingQueue<Integer>) queue).take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
queue.poll();
}
}
latch.countDown();
}).start();
}
latch.await();
long endTime = System.currentTimeMillis();
System.out.printf("%s: %d ms%n", name, (endTime - startTime));
return endTime - startTime;
}
}

Best Practices

  1. Choose the right concurrent collection based on your use case
  2. Use ConcurrentHashMap for high-concurrency maps
  3. Use CopyOnWriteArrayList for read-heavy, write-rarely scenarios
  4. Prefer ConcurrentLinkedQueue for non-blocking operations
  5. Use BlockingQueue for producer-consumer scenarios
  6. Consider ConcurrentSkipListMap when you need sorted order
  7. Always understand the consistency guarantees of each collection

Concurrent collections provide efficient thread-safe alternatives to synchronized wrappers, offering better performance and scalability in multi-threaded applications.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper