Table of Contents
- Introduction to Inter-Thread Communication
- wait(), notify(), notifyAll()
- Producer-Consumer Pattern
- BlockingQueue Implementation
- Synchronized Methods and Blocks
- Lock and Condition
- Volatile Keyword
- Atomic Variables
- CountDownLatch
- CyclicBarrier
- Phaser
- Exchanger
- Semaphore
Introduction to Inter-Thread Communication
Inter-thread communication is the mechanism that allows synchronized threads to communicate with each other. It's essential for coordinating activities between threads and avoiding thread interference.
Why Inter-Thread Communication?
- Coordination: Threads need to coordinate their activities
- Resource Sharing: Safe sharing of resources between threads
- Efficiency: Avoid busy waiting and reduce CPU cycles
- Synchronization: Ensure proper ordering of operations
Basic Communication Problem
public class BasicCommunicationProblem {
private static String message;
private static boolean isMessageReady = false;
public static void main(String[] args) {
// Producer thread
Thread producer = new Thread(() -> {
try {
Thread.sleep(2000); // Simulate work
message = "Hello from Producer!";
isMessageReady = true;
System.out.println("Producer: Message produced");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// Consumer thread - PROBLEM: Busy waiting!
Thread consumer = new Thread(() -> {
while (!isMessageReady) {
// Busy waiting - wastes CPU cycles
// System.out.println("Consumer: Waiting for message...");
}
System.out.println("Consumer: Received - " + message);
});
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
wait(), notify(), notifyAll()
Basic wait() and notify() Usage
public class WaitNotifyBasics {
private static final Object lock = new Object();
private static boolean condition = false;
public static void main(String[] args) {
Thread waitingThread = new Thread(() -> {
synchronized (lock) {
System.out.println("Waiting thread: Acquired lock, checking condition");
while (!condition) {
try {
System.out.println("Waiting thread: Condition false, going to wait");
lock.wait(); // Releases lock and waits
System.out.println("Waiting thread: Woke up, checking condition again");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Waiting thread: Condition is true, proceeding");
}
});
Thread notifyingThread = new Thread(() -> {
try {
Thread.sleep(2000); // Simulate work
synchronized (lock) {
System.out.println("Notifying thread: Acquired lock, changing condition");
condition = true;
lock.notify(); // Wakes up one waiting thread
System.out.println("Notifying thread: Notified waiting thread");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
waitingThread.start();
notifyingThread.start();
try {
waitingThread.join();
notifyingThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
wait() and notifyAll() Example
public class WaitNotifyAllExample {
private static final Object lock = new Object();
private static int availableResources = 0;
static class ResourceConsumer extends Thread {
private final int id;
public ResourceConsumer(int id) {
this.id = id;
setName("Consumer-" + id);
}
@Override
public void run() {
synchronized (lock) {
System.out.println(getName() + ": Waiting for resource");
while (availableResources == 0) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
availableResources--;
System.out.println(getName() + ": Got resource. Remaining: " + availableResources);
}
}
}
static class ResourceProducer extends Thread {
@Override
public void run() {
try {
Thread.sleep(2000); // Simulate production time
synchronized (lock) {
availableResources = 3;
System.out.println("Producer: Produced 3 resources");
lock.notifyAll(); // Notify all waiting threads
System.out.println("Producer: Notified all waiting consumers");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// Create multiple consumers
for (int i = 1; i <= 5; i++) {
new ResourceConsumer(i).start();
}
// Create producer
new ResourceProducer().start();
}
}
Common wait()/notify() Patterns
public class WaitNotifyPatterns {
// Pattern 1: Guarded Block
static class GuardedBlock {
private boolean condition = false;
private final Object lock = new Object();
public void doWhenCondition() throws InterruptedException {
synchronized (lock) {
while (!condition) {
lock.wait();
}
// Perform action when condition is true
System.out.println("Condition met, performing action");
}
}
public void makeConditionTrue() {
synchronized (lock) {
condition = true;
lock.notifyAll();
}
}
}
// Pattern 2: Timeout wait
static class TimeoutWait {
private final Object lock = new Object();
private boolean ready = false;
public void waitWithTimeout(long timeoutMs) throws InterruptedException {
synchronized (lock) {
long endTime = System.currentTimeMillis() + timeoutMs;
long remaining = timeoutMs;
while (!ready && remaining > 0) {
lock.wait(remaining);
remaining = endTime - System.currentTimeMillis();
}
if (ready) {
System.out.println("Condition met within timeout");
} else {
System.out.println("Timeout occurred");
}
}
}
public void setReady() {
synchronized (lock) {
ready = true;
lock.notify();
}
}
}
// Pattern 3: Multiple conditions
static class MultipleConditions {
private final Object lock = new Object();
private boolean condition1 = false;
private boolean condition2 = false;
public void waitForCondition1() throws InterruptedException {
synchronized (lock) {
while (!condition1) {
lock.wait();
}
System.out.println("Condition 1 met");
}
}
public void waitForCondition2() throws InterruptedException {
synchronized (lock) {
while (!condition2) {
lock.wait();
}
System.out.println("Condition 2 met");
}
}
public void setCondition1() {
synchronized (lock) {
condition1 = true;
lock.notifyAll(); // Notify all as multiple conditions might be waiting
}
}
public void setCondition2() {
synchronized (lock) {
condition2 = true;
lock.notifyAll();
}
}
}
public static void main(String[] args) throws InterruptedException {
// Test Guarded Block
GuardedBlock guardedBlock = new GuardedBlock();
new Thread(() -> {
try {
guardedBlock.doWhenCondition();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(1000);
guardedBlock.makeConditionTrue();
// Test Timeout Wait
TimeoutWait timeoutWait = new TimeoutWait();
new Thread(() -> {
try {
timeoutWait.waitWithTimeout(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// Don't set ready - will timeout
Thread.sleep(4000);
}
}
Producer-Consumer Pattern
Basic Producer-Consumer with wait/notify
public class ProducerConsumerWaitNotify {
private static final int BUFFER_SIZE = 5;
private static final int[] buffer = new int[BUFFER_SIZE];
private static int count = 0;
private static int putIndex = 0;
private static int takeIndex = 0;
private static final Object lock = new Object();
static class Producer extends Thread {
private final int id;
public Producer(int id) {
this.id = id;
setName("Producer-" + id);
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
produce(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void produce(int item) throws InterruptedException {
synchronized (lock) {
while (count == BUFFER_SIZE) {
System.out.println(getName() + ": Buffer full, waiting...");
lock.wait();
}
buffer[putIndex] = item;
putIndex = (putIndex + 1) % BUFFER_SIZE;
count++;
System.out.println(getName() + ": Produced " + item + ", count: " + count);
lock.notifyAll(); // Notify consumers
}
}
}
static class Consumer extends Thread {
private final int id;
public Consumer(int id) {
this.id = id;
setName("Consumer-" + id);
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void consume() throws InterruptedException {
synchronized (lock) {
while (count == 0) {
System.out.println(getName() + ": Buffer empty, waiting...");
lock.wait();
}
int item = buffer[takeIndex];
takeIndex = (takeIndex + 1) % BUFFER_SIZE;
count--;
System.out.println(getName() + ": Consumed " + item + ", count: " + count);
lock.notifyAll(); // Notify producers
}
}
}
public static void main(String[] args) {
Producer p1 = new Producer(1);
Producer p2 = new Producer(2);
Consumer c1 = new Consumer(1);
Consumer c2 = new Consumer(2);
p1.start();
p2.start();
c1.start();
c2.start();
try {
p1.join();
p2.join();
c1.join();
c2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Producer-Consumer completed");
}
}
Advanced Producer-Consumer with Multiple Conditions
public class AdvancedProducerConsumer {
private static final int BUFFER_SIZE = 3;
private final int[] buffer = new int[BUFFER_SIZE];
private int count = 0;
private int putIndex = 0;
private int takeIndex = 0;
private final Object lock = new Object();
public void produce(int item, String producerName) throws InterruptedException {
synchronized (lock) {
// Wait while buffer is full
while (count == BUFFER_SIZE) {
System.out.println(producerName + ": Buffer FULL - waiting...");
lock.wait();
}
// Produce item
buffer[putIndex] = item;
putIndex = (putIndex + 1) % BUFFER_SIZE;
count++;
System.out.println(producerName + ": Produced " + item +
" | Buffer: [" + getBufferContents() + "]");
// Notify all waiting threads (both producers and consumers)
lock.notifyAll();
}
}
public int consume(String consumerName) throws InterruptedException {
synchronized (lock) {
// Wait while buffer is empty
while (count == 0) {
System.out.println(consumerName + ": Buffer EMPTY - waiting...");
lock.wait();
}
// Consume item
int item = buffer[takeIndex];
takeIndex = (takeIndex + 1) % BUFFER_SIZE;
count--;
System.out.println(consumerName + ": Consumed " + item +
" | Buffer: [" + getBufferContents() + "]");
// Notify all waiting threads
lock.notifyAll();
return item;
}
}
private String getBufferContents() {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < count; i++) {
int index = (takeIndex + i) % BUFFER_SIZE;
sb.append(buffer[index]);
if (i < count - 1) {
sb.append(", ");
}
}
return sb.toString();
}
public static void main(String[] args) {
AdvancedProducerConsumer pc = new AdvancedProducerConsumer();
// Create multiple producers
for (int i = 1; i <= 2; i++) {
final int producerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 5; j++) {
pc.produce(producerId * 100 + j, "Producer-" + producerId);
Thread.sleep((long) (Math.random() * 1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// Create multiple consumers
for (int i = 1; i <= 3; i++) {
final int consumerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 4; j++) {
pc.consume("Consumer-" + consumerId);
Thread.sleep((long) (Math.random() * 1500));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
BlockingQueue Implementation
Custom BlockingQueue Implementation
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
public class CustomBlockingQueue<T> {
private final Queue<T> queue;
private final int capacity;
private final Object lock = new Object();
public CustomBlockingQueue(int capacity) {
this.queue = new LinkedList<>();
this.capacity = capacity;
}
public void put(T item) throws InterruptedException {
synchronized (lock) {
while (queue.size() == capacity) {
System.out.println(Thread.currentThread().getName() + ": Queue full, waiting...");
lock.wait();
}
queue.offer(item);
System.out.println(Thread.currentThread().getName() + ": Added " + item +
" | Size: " + queue.size());
lock.notifyAll();
}
}
public T take() throws InterruptedException {
synchronized (lock) {
while (queue.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": Queue empty, waiting...");
lock.wait();
}
T item = queue.poll();
System.out.println(Thread.currentThread().getName() + ": Removed " + item +
" | Size: " + queue.size());
lock.notifyAll();
return item;
}
}
public boolean offer(T item, long timeout, TimeUnit unit) throws InterruptedException {
synchronized (lock) {
long nanosToWait = unit.toNanos(timeout);
long endTime = System.nanoTime() + nanosToWait;
while (queue.size() == capacity) {
if (nanosToWait <= 0) {
return false; // Timeout
}
lock.wait(nanosToWait / 1000000, (int) (nanosToWait % 1000000));
nanosToWait = endTime - System.nanoTime();
}
queue.offer(item);
lock.notifyAll();
return true;
}
}
public int size() {
synchronized (lock) {
return queue.size();
}
}
public boolean isEmpty() {
synchronized (lock) {
return queue.isEmpty();
}
}
}
// Test the Custom BlockingQueue
class BlockingQueueTest {
public static void main(String[] args) {
CustomBlockingQueue<Integer> queue = new CustomBlockingQueue<>(3);
// Producer thread
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.put(i);
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Producer");
// Consumer thread
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.take();
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer");
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("BlockingQueue test completed");
}
}
Synchronized Methods and Blocks
Synchronized Communication
public class SynchronizedCommunication {
static class SharedResource {
private int value = 0;
private boolean hasValue = false;
// Synchronized method for producer
public synchronized void produce(int newValue) throws InterruptedException {
while (hasValue) {
System.out.println(Thread.currentThread().getName() + ": Resource has value, waiting...");
wait();
}
value = newValue;
hasValue = true;
System.out.println(Thread.currentThread().getName() + ": Produced " + value);
notifyAll(); // Notify all waiting consumers
}
// Synchronized method for consumer
public synchronized int consume() throws InterruptedException {
while (!hasValue) {
System.out.println(Thread.currentThread().getName() + ": No value available, waiting...");
wait();
}
hasValue = false;
System.out.println(Thread.currentThread().getName() + ": Consumed " + value);
notifyAll(); // Notify all waiting producers
return value;
}
// Synchronized block with specific lock object
private final Object readLock = new Object();
private final Object writeLock = new Object();
private int readers = 0;
public void readOperation() {
synchronized (readLock) {
readers++;
System.out.println(Thread.currentThread().getName() + ": Reading, readers: " + readers);
}
// Simulate read operation
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (readLock) {
readers--;
System.out.println(Thread.currentThread().getName() + ": Finished reading, readers: " + readers);
if (readers == 0) {
readLock.notifyAll(); // Notify waiting writers
}
}
}
public void writeOperation() throws InterruptedException {
synchronized (readLock) {
while (readers > 0) {
System.out.println(Thread.currentThread().getName() + ": Waiting for readers to finish");
readLock.wait();
}
}
synchronized (writeLock) {
System.out.println(Thread.currentThread().getName() + ": Writing");
// Simulate write operation
Thread.sleep(200);
System.out.println(Thread.currentThread().getName() + ": Finished writing");
}
}
}
public static void main(String[] args) {
SharedResource resource = new SharedResource();
// Test producer-consumer with synchronized methods
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
resource.produce(i);
Thread.sleep(300);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Producer");
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
resource.consume();
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer");
producer.start();
consumer.start();
// Test reader-writer pattern
for (int i = 1; i <= 3; i++) {
final int readerId = i;
new Thread(() -> {
for (int j = 0; j < 3; j++) {
resource.readOperation();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Reader-" + readerId).start();
}
for (int i = 1; i <= 2; i++) {
final int writerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 2; j++) {
resource.writeOperation();
Thread.sleep(300);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Writer-" + writerId).start();
}
try {
producer.join();
consumer.join();
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Synchronized communication test completed");
}
}
Lock and Condition
Using Lock and Condition for Communication
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockConditionCommunication {
static class MessageQueue {
private final String[] messages;
private int putIndex = 0;
private int takeIndex = 0;
private int count = 0;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public MessageQueue(int capacity) {
messages = new String[capacity];
}
public void put(String message) throws InterruptedException {
lock.lock();
try {
while (count == messages.length) {
System.out.println(Thread.currentThread().getName() + ": Queue full, waiting...");
notFull.await();
}
messages[putIndex] = message;
putIndex = (putIndex + 1) % messages.length;
count++;
System.out.println(Thread.currentThread().getName() + ": Added '" + message +
"' | Size: " + count);
notEmpty.signal(); // Signal waiting consumers
} finally {
lock.unlock();
}
}
public String take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
System.out.println(Thread.currentThread().getName() + ": Queue empty, waiting...");
notEmpty.await();
}
String message = messages[takeIndex];
takeIndex = (takeIndex + 1) % messages.length;
count--;
System.out.println(Thread.currentThread().getName() + ": Removed '" + message +
"' | Size: " + count);
notFull.signal(); // Signal waiting producers
return message;
} finally {
lock.unlock();
}
}
public boolean tryPut(String message, long timeoutMs) throws InterruptedException {
lock.lock();
try {
long endTime = System.currentTimeMillis() + timeoutMs;
long remaining = timeoutMs;
while (count == messages.length && remaining > 0) {
notFull.await(remaining, java.util.concurrent.TimeUnit.MILLISECONDS);
remaining = endTime - System.currentTimeMillis();
}
if (count == messages.length) {
return false; // Timeout
}
messages[putIndex] = message;
putIndex = (putIndex + 1) % messages.length;
count++;
notEmpty.signal();
return true;
} finally {
lock.unlock();
}
}
}
static class AdvancedResource {
private final Lock lock = new ReentrantLock();
private final Condition dataAvailable = lock.newCondition();
private final Condition spaceAvailable = lock.newCondition();
private final Condition operationComplete = lock.newCondition();
private String data = null;
private boolean completed = false;
public void produceData(String newData) throws InterruptedException {
lock.lock();
try {
while (data != null) {
System.out.println(Thread.currentThread().getName() + ": Data exists, waiting for space");
spaceAvailable.await();
}
data = newData;
System.out.println(Thread.currentThread().getName() + ": Produced data: " + data);
dataAvailable.signal(); // Signal consumers
} finally {
lock.unlock();
}
}
public String consumeData() throws InterruptedException {
lock.lock();
try {
while (data == null && !completed) {
System.out.println(Thread.currentThread().getName() + ": No data, waiting...");
dataAvailable.await();
}
if (completed && data == null) {
return null; // No more data
}
String consumedData = data;
data = null;
System.out.println(Thread.currentThread().getName() + ": Consumed data: " + consumedData);
spaceAvailable.signal(); // Signal producers
return consumedData;
} finally {
lock.unlock();
}
}
public void markCompleted() {
lock.lock();
try {
completed = true;
dataAvailable.signalAll(); // Wake up all waiting consumers
} finally {
lock.unlock();
}
}
public void waitForCompletion() throws InterruptedException {
lock.lock();
try {
while (!completed) {
operationComplete.await();
}
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
// Test MessageQueue with Lock and Condition
MessageQueue queue = new MessageQueue(3);
Thread producer1 = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
queue.put("Message-" + i + "-from-P1");
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Producer-1");
Thread producer2 = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
boolean success = queue.tryPut("Message-" + i + "-from-P2", 1000);
if (!success) {
System.out.println("Producer-2: Failed to put message (timeout)");
}
Thread.sleep(300);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Producer-2");
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.take();
Thread.sleep(400);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer");
producer1.start();
producer2.start();
consumer.start();
try {
producer1.join();
producer2.join();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Lock and Condition test completed");
}
}
Volatile Keyword
Volatile for Visibility
public class VolatileCommunication {
static class SharedFlag {
private volatile boolean flag = false;
private int counter = 0;
public void setFlag() {
flag = true;
System.out.println(Thread.currentThread().getName() + ": Flag set to true");
}
public void waitForFlag() {
while (!flag) {
// Busy wait - but with volatile, changes are visible
counter++; // Some work to prevent excessive CPU usage
if (counter % 1000000 == 0) {
System.out.println(Thread.currentThread().getName() + ": Still waiting...");
}
}
System.out.println(Thread.currentThread().getName() + ": Flag is true, proceeding");
}
public boolean getFlag() {
return flag;
}
}
static class VolatileCounter {
private volatile int count = 0;
// Volatile ensures visibility but not atomicity
public void increment() {
count++; // This is not atomic!
}
public int getCount() {
return count;
}
}
static class DataProcessor {
private volatile boolean shutdownRequested = false;
private volatile String latestData = null;
public void processData() {
while (!shutdownRequested) {
if (latestData != null) {
String data = latestData;
latestData = null; // Clear after processing
System.out.println(Thread.currentThread().getName() + ": Processing: " + data);
}
// Small sleep to prevent busy waiting
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println(Thread.currentThread().getName() + ": Shutdown completed");
}
public void submitData(String data) {
latestData = data;
System.out.println(Thread.currentThread().getName() + ": Submitted: " + data);
}
public void requestShutdown() {
shutdownRequested = true;
System.out.println("Shutdown requested");
}
}
public static void main(String[] args) throws InterruptedException {
// Test 1: Basic volatile flag
System.out.println("=== Test 1: Volatile Flag ===");
SharedFlag sharedFlag = new SharedFlag();
Thread waiter = new Thread(() -> {
sharedFlag.waitForFlag();
}, "Waiter");
Thread setter = new Thread(() -> {
try {
Thread.sleep(2000);
sharedFlag.setFlag();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Setter");
waiter.start();
setter.start();
waiter.join();
setter.join();
// Test 2: Volatile counter (demonstrates limitation)
System.out.println("\n=== Test 2: Volatile Counter (Non-atomic) ===");
VolatileCounter counter = new VolatileCounter();
Thread[] incrementers = new Thread[10];
for (int i = 0; i < incrementers.length; i++) {
incrementers[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
});
incrementers[i].start();
}
for (Thread t : incrementers) {
t.join();
}
System.out.println("Expected count: 10000");
System.out.println("Actual count: " + counter.getCount());
System.out.println("Volatile ensures visibility but not atomicity!");
// Test 3: Data processor with volatile
System.out.println("\n=== Test 3: Data Processor ===");
DataProcessor processor = new DataProcessor();
Thread processorThread = new Thread(() -> {
processor.processData();
}, "Processor");
processorThread.start();
// Submit some data
for (int i = 1; i <= 5; i++) {
processor.submitData("Data-" + i);
Thread.sleep(500);
}
// Request shutdown
processor.requestShutdown();
processorThread.join();
System.out.println("All volatile tests completed");
}
}
Atomic Variables
Atomic Variables for Lock-Free Communication
import java.util.concurrent.atomic.*;
public class AtomicCommunication {
static class AtomicCounter {
private final AtomicInteger count = new AtomicInteger(0);
private final AtomicLong total = new AtomicLong(0);
private final AtomicReference<String> latestMessage = new AtomicReference<>("Initial");
public void increment() {
count.incrementAndGet();
}
public void addToTotal(int value) {
total.addAndGet(value);
}
public void setMessage(String message) {
latestMessage.set(message);
}
public boolean compareAndSetMessage(String expected, String newMessage) {
return latestMessage.compareAndSet(expected, newMessage);
}
public void printState() {
System.out.println("Count: " + count.get() +
", Total: " + total.get() +
", Message: " + latestMessage.get());
}
}
static class AtomicCoordinator {
private final AtomicInteger activeWorkers = new AtomicInteger(0);
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
private final AtomicReferenceArray<String> workerStatus;
public AtomicCoordinator(int numWorkers) {
workerStatus = new AtomicReferenceArray<>(numWorkers);
for (int i = 0; i < numWorkers; i++) {
workerStatus.set(i, "Initialized");
}
}
public void workerStarted(int workerId) {
activeWorkers.incrementAndGet();
workerStatus.set(workerId, "Running");
System.out.println("Worker " + workerId + " started. Active: " + activeWorkers.get());
}
public void workerCompleted(int workerId) {
int remaining = activeWorkers.decrementAndGet();
workerStatus.set(workerId, "Completed");
System.out.println("Worker " + workerId + " completed. Remaining: " + remaining);
}
public void requestShutdown() {
shutdownRequested.set(true);
System.out.println("Shutdown requested");
}
public boolean shouldShutdown() {
return shutdownRequested.get();
}
public void printStatus() {
System.out.print("Worker status: [");
for (int i = 0; i < workerStatus.length(); i++) {
System.out.print(workerStatus.get(i));
if (i < workerStatus.length() - 1) {
System.out.print(", ");
}
}
System.out.println("]");
}
}
static class LockFreeStack<T> {
private static class Node<T> {
final T value;
Node<T> next;
Node(T value) {
this.value = value;
}
}
private final AtomicReference<Node<T>> top = new AtomicReference<>();
public void push(T value) {
Node<T> newHead = new Node<>(value);
Node<T> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
System.out.println(Thread.currentThread().getName() + ": Pushed " + value);
}
public T pop() {
Node<T> oldHead;
Node<T> newHead;
do {
oldHead = top.get();
if (oldHead == null) {
return null;
}
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
System.out.println(Thread.currentThread().getName() + ": Popped " + oldHead.value);
return oldHead.value;
}
public boolean isEmpty() {
return top.get() == null;
}
}
public static void main(String[] args) throws InterruptedException {
// Test 1: Atomic Counter
System.out.println("=== Test 1: Atomic Counter ===");
AtomicCounter counter = new AtomicCounter();
Thread[] counterThreads = new Thread[5];
for (int i = 0; i < counterThreads.length; i++) {
counterThreads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
counter.addToTotal(j);
}
});
counterThreads[i].start();
}
for (Thread t : counterThreads) {
t.join();
}
counter.printState();
// Test 2: Atomic Coordinator
System.out.println("\n=== Test 2: Atomic Coordinator ===");
AtomicCoordinator coordinator = new AtomicCoordinator(3);
for (int i = 0; i < 3; i++) {
final int workerId = i;
new Thread(() -> {
coordinator.workerStarted(workerId);
try {
Thread.sleep(1000 + workerId * 500);
coordinator.workerCompleted(workerId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(2000);
coordinator.printStatus();
// Test 3: Lock-Free Stack
System.out.println("\n=== Test 3: Lock-Free Stack ===");
LockFreeStack<Integer> stack = new LockFreeStack<>();
Thread pusher1 = new Thread(() -> {
for (int i = 1; i <= 5; i++) {
stack.push(i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Pusher-1");
Thread pusher2 = new Thread(() -> {
for (int i = 6; i <= 10; i++) {
stack.push(i);
try {
Thread.sleep(150);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Pusher-2");
Thread popper = new Thread(() -> {
for (int i = 1; i <= 10; i++) {
Integer value = stack.pop();
if (value != null) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "Popper");
pusher1.start();
pusher2.start();
popper.start();
pusher1.join();
pusher2.join();
popper.join();
System.out.println("Stack empty: " + stack.isEmpty());
System.out.println("All atomic tests completed");
}
}
CountDownLatch
CountDownLatch for Coordination
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatchCommunication {
static class ServiceInitializer {
private final CountDownLatch initializationLatch;
private final CountDownLatch shutdownLatch;
private volatile boolean initialized = false;
public ServiceInitializer(int numServices) {
this.initializationLatch = new CountDownLatch(numServices);
this.shutdownLatch = new CountDownLatch(numServices);
}
public void serviceReady(String serviceName) {
initializationLatch.countDown();
System.out.println(serviceName + " is ready. Remaining: " + initializationLatch.getCount());
}
public void serviceStopped(String serviceName) {
shutdownLatch.countDown();
System.out.println(serviceName + " stopped. Remaining: " + shutdownLatch.getCount());
}
public void waitForInitialization() throws InterruptedException {
System.out.println("Waiting for all services to initialize...");
initializationLatch.await();
initialized = true;
System.out.println("All services initialized! Starting main operation.");
}
public void waitForShutdown() throws InterruptedException {
System.out.println("Waiting for all services to shutdown...");
shutdownLatch.await();
System.out.println("All services shutdown completed.");
}
public boolean isInitialized() {
return initialized;
}
}
static class ParallelProcessor {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
private final int numWorkers;
public ParallelProcessor(int numWorkers) {
this.numWorkers = numWorkers;
this.startSignal = new CountDownLatch(1);
this.doneSignal = new CountDownLatch(numWorkers);
}
public void startProcessing() {
System.out.println("Starting all workers...");
startSignal.countDown(); // Release all workers
}
public void waitForCompletion() throws InterruptedException {
doneSignal.await(); // Wait for all workers to complete
System.out.println("All workers completed!");
}
public void waitForCompletion(long timeout, TimeUnit unit) throws InterruptedException {
boolean completed = doneSignal.await(timeout, unit);
if (completed) {
System.out.println("All workers completed within timeout!");
} else {
System.out.println("Timeout! Some workers still running. Completed: " +
(numWorkers - doneSignal.getCount()) + "/" + numWorkers);
}
}
public void workerReady(int workerId) throws InterruptedException {
System.out.println("Worker " + workerId + " waiting for start signal...");
startSignal.await(); // Wait for start signal
System.out.println("Worker " + workerId + " started processing");
// Simulate work
Thread.sleep(1000 + workerId * 200);
System.out.println("Worker " + workerId + " completed");
doneSignal.countDown();
}
}
static class RaceCoordinator {
private final CountDownLatch readySignal;
private final CountDownLatch startSignal;
private final CountDownLatch finishSignal;
public RaceCoordinator(int numRacers) {
this.readySignal = new CountDownLatch(numRacers);
this.startSignal = new CountDownLatch(1);
this.finishSignal = new CountDownLatch(numRacers);
}
public void racerReady(String racerName) {
readySignal.countDown();
System.out.println(racerName + " is ready. Waiting: " + readySignal.getCount());
}
public void startRace() throws InterruptedException {
System.out.println("Waiting for all racers to be ready...");
readySignal.await();
System.out.println("All racers ready! Starting race in 3 seconds...");
Thread.sleep(3000);
System.out.println("GO!!!");
startSignal.countDown();
}
public void waitForRacer(String racerName) throws InterruptedException {
startSignal.await(); // Wait for start signal
System.out.println(racerName + " is running...");
// Simulate race time
long raceTime = 2000 + (long) (Math.random() * 3000);
Thread.sleep(raceTime);
System.out.println(racerName + " finished in " + raceTime + "ms");
finishSignal.countDown();
}
public void waitForRaceFinish() throws InterruptedException {
finishSignal.await();
System.out.println("Race completed! All racers finished.");
}
}
public static void main(String[] args) throws InterruptedException {
// Test 1: Service Initialization
System.out.println("=== Test 1: Service Initialization ===");
ServiceInitializer initializer = new ServiceInitializer(3);
for (int i = 1; i <= 3; i++) {
final int serviceId = i;
new Thread(() -> {
try {
Thread.sleep(serviceId * 1000); // Simulate initialization time
initializer.serviceReady("Service-" + serviceId);
// Simulate service running
Thread.sleep(2000);
initializer.serviceStopped("Service-" + serviceId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
initializer.waitForInitialization();
System.out.println("Main operation running...");
Thread.sleep(1000);
initializer.waitForShutdown();
// Test 2: Parallel Processing
System.out.println("\n=== Test 2: Parallel Processing ===");
ParallelProcessor processor = new ParallelProcessor(4);
for (int i = 1; i <= 4; i++) {
final int workerId = i;
new Thread(() -> {
try {
processor.workerReady(workerId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(1000); // Let workers get ready
processor.startProcessing();
processor.waitForCompletion();
// Test 3: Race Coordination
System.out.println("\n=== Test 3: Race Coordination ===");
RaceCoordinator race = new RaceCoordinator(4);
for (int i = 1; i <= 4; i++) {
final int racerId = i;
new Thread(() -> {
try {
race.racerReady("Racer-" + racerId);
race.waitForRacer("Racer-" + racerId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
race.startRace();
race.waitForRaceFinish();
System.out.println("All CountDownLatch tests completed");
}
}
CyclicBarrier
CyclicBarrier for Synchronization
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class CyclicBarrierCommunication {
static class MatrixProcessor {
private final CyclicBarrier barrier;
private final int[][] matrix;
private final int numWorkers;
public MatrixProcessor(int[][] matrix, int numWorkers) {
this.matrix = matrix;
this.numWorkers = numWorkers;
this.barrier = new CyclicBarrier(numWorkers, () -> {
System.out.println("All workers completed current phase! Starting next phase...");
});
}
public void processRow(int workerId, int startRow, int endRow) {
try {
for (int phase = 1; phase <= 3; phase++) {
System.out.println("Worker " + workerId + " starting phase " + phase +
" (rows " + startRow + "-" + endRow + ")");
// Simulate processing
Thread.sleep(1000 + workerId * 100);
System.out.println("Worker " + workerId + " completed phase " + phase +
", waiting at barrier...");
// Wait for all workers to complete this phase
barrier.await();
System.out.println("Worker " + workerId + " passed barrier for phase " + phase);
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
static class TournamentOrganizer {
private final CyclicBarrier roundBarrier;
private final int numPlayers;
private int currentRound = 1;
public TournamentOrganizer(int numPlayers) {
this.numPlayers = numPlayers;
this.roundBarrier = new CyclicBarrier(numPlayers, () -> {
System.out.println("\n=== Round " + currentRound + " completed ===");
currentRound++;
});
}
public void playRound(String playerName) {
try {
System.out.println(playerName + " preparing for round " + currentRound);
Thread.sleep(500 + (long) (Math.random() * 1000));
System.out.println(playerName + " completed round " + currentRound +
", waiting for others...");
roundBarrier.await(5, TimeUnit.SECONDS); // Timeout to prevent deadlock
System.out.println(playerName + " advancing to next round");
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
System.out.println(playerName + " eliminated: " + e.getClass().getSimpleName());
}
}
}
static class DataAggregator {
private final CyclicBarrier aggregationBarrier;
private final double[] partialResults;
private double finalResult = 0;
public DataAggregator(int numWorkers) {
this.partialResults = new double[numWorkers];
this.aggregationBarrier = new CyclicBarrier(numWorkers, () -> {
// This runs when all threads reach the barrier
aggregateResults();
});
}
public void processData(int workerId, double[] dataChunk) {
try {
// Phase 1: Process data chunk
double partialSum = 0;
for (double value : dataChunk) {
partialSum += value;
}
partialResults[workerId] = partialSum;
System.out.println("Worker " + workerId + " computed partial sum: " + partialSum);
aggregationBarrier.await(); // Wait for all workers
// Phase 2: All workers can now use the final result
System.out.println("Worker " + workerId + " using final result: " + finalResult);
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
private void aggregateResults() {
finalResult = 0;
for (double result : partialResults) {
finalResult += result;
}
System.out.println("Aggregated final result: " + finalResult);
}
}
static class ResilientBarrier {
private final CyclicBarrier barrier;
private final int totalWorkers;
private int activeWorkers;
public ResilientBarrier(int totalWorkers) {
this.totalWorkers = totalWorkers;
this.activeWorkers = totalWorkers;
this.barrier = new CyclicBarrier(totalWorkers, () -> {
System.out.println("Barrier tripped! Active workers: " + activeWorkers);
});
}
public void doWork(String workerName) {
try {
for (int i = 0; i < 3; i++) {
System.out.println(workerName + " working on iteration " + i);
Thread.sleep(1000);
// Simulate occasional worker failure
if (Math.random() < 0.1) { // 10% chance of failure
System.out.println(workerName + " failed! Removing from barrier.");
activeWorkers--;
return;
}
System.out.println(workerName + " waiting at barrier (iteration " + i + ")");
barrier.await();
}
} catch (InterruptedException | BrokenBarrierException e) {
System.out.println(workerName + " interrupted: " + e.getMessage());
}
}
public void reset() {
barrier.reset();
activeWorkers = totalWorkers;
System.out.println("Barrier reset to " + totalWorkers + " workers");
}
}
public static void main(String[] args) throws InterruptedException {
// Test 1: Matrix Processing
System.out.println("=== Test 1: Matrix Processing ===");
int[][] matrix = new int[10][10];
MatrixProcessor processor = new MatrixProcessor(matrix, 3);
for (int i = 0; i < 3; i++) {
final int workerId = i;
final int startRow = i * 3;
final int endRow = startRow + 2;
new Thread(() -> {
processor.processRow(workerId, startRow, endRow);
}).start();
}
Thread.sleep(5000);
// Test 2: Tournament
System.out.println("\n=== Test 2: Tournament ===");
TournamentOrganizer tournament = new TournamentOrganizer(4);
for (int i = 1; i <= 4; i++) {
final String playerName = "Player-" + i;
new Thread(() -> {
for (int round = 1; round <= 3; round++) {
tournament.playRound(playerName);
}
}).start();
}
Thread.sleep(8000);
// Test 3: Data Aggregation
System.out.println("\n=== Test 3: Data Aggregation ===");
DataAggregator aggregator = new DataAggregator(3);
for (int i = 0; i < 3; i++) {
final int workerId = i;
double[] dataChunk = {1.0, 2.0, 3.0, 4.0, 5.0};
new Thread(() -> {
aggregator.processData(workerId, dataChunk);
}).start();
}
Thread.sleep(3000);
// Test 4: Resilient Barrier
System.out.println("\n=== Test 4: Resilient Barrier ===");
ResilientBarrier resilient = new ResilientBarrier(3);
for (int i = 1; i <= 3; i++) {
final String workerName = "Worker-" + i;
new Thread(() -> {
resilient.doWork(workerName);
}).start();
}
Thread.sleep(5000);
resilient.reset();
System.out.println("All CyclicBarrier tests completed");
}
}
Phaser
Phaser for Flexible Synchronization
import java.util.concurrent.Phaser;
public class PhaserCommunication {
static class MultiPhaseProcessor {
private final Phaser phaser;
private final int numPhases;
public MultiPhaseProcessor(int numWorkers, int numPhases) {
this.numPhases = numPhases;
this.phaser = new Phaser(numWorkers) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("Phase " + phase + " completed. " +
"Registered parties: " + registeredParties);
return phase >= numPhases - 1 || registeredParties == 0;
}
};
}
public void processPhase(String workerName) {
int phase;
while ((phase = phaser.getPhase()) < numPhases) {
System.out.println(workerName + " starting phase " + phase);
try {
// Simulate work
Thread.sleep(500 + (long) (Math.random() * 500));
System.out.println(workerName + " completed phase " + phase);
// Wait for all workers to complete this phase
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
System.out.println(workerName + " interrupted, deregistering");
phaser.arriveAndDeregister();
break;
}
}
System.out.println(workerName + " completed all phases");
}
}
static class DynamicWorkforce {
private final Phaser phaser;
public DynamicWorkforce() {
this.phaser = new Phaser(1); // Register self
}
public void addWorker(String workerName) {
phaser.register();
System.out.println("Added " + workerName + ". Total workers: " + phaser.getRegisteredParties());
}
public void removeWorker(String workerName) {
phaser.arriveAndDeregister();
System.out.println("Removed " + workerName + ". Total workers: " + phaser.getRegisteredParties());
}
public void coordinateWork(String coordinatorName) {
try {
for (int phase = 0; phase < 3; phase++) {
System.out.println(coordinatorName + " starting phase " + phase);
// Wait for workers to complete
int arrived = phaser.arriveAndAwaitAdvance();
System.out.println(coordinatorName + ": Phase " + phase +
" completed. Arrived: " + arrived);
// Simulate coordination work
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void workerTask(String workerName) {
phaser.register(); // Register this worker
try {
while (!phaser.isTerminated()) {
int phase = phaser.getPhase();
System.out.println(workerName + " working on phase " + phase);
Thread.sleep(800 + (long) (Math.random() * 400));
System.out.println(workerName + " completed phase " + phase);
phaser.arriveAndAwaitAdvance();
}
} catch (InterruptedException e) {
phaser.arriveAndDeregister();
}
System.out.println(workerName + " terminated");
}
}
static class TieredProcessing {
private final Phaser tier1Phaser;
private final Phaser tier2Phaser;
public TieredProcessing(int tier1Workers, int tier2Workers) {
this.tier1Phaser = new Phaser(tier1Workers);
this.tier2Phaser = new Phaser(tier2Workers);
}
public void tier1Worker(String workerName) {
try {
for (int i = 0; i < 2; i++) {
System.out.println(workerName + " (Tier 1) processing batch " + i);
Thread.sleep(1000);
tier1Phaser.arriveAndAwaitAdvance();
System.out.println(workerName + " (Tier 1) completed batch " + i);
// Signal tier 2 to start
if (i == 0) {
System.out.println(workerName + " signaling Tier 2 to start");
}
}
} catch (InterruptedException e) {
tier1Phaser.arriveAndDeregister();
}
}
public void tier2Worker(String workerName) {
try {
// Wait for tier 1 to complete first batch
System.out.println(workerName + " (Tier 2) waiting for Tier 1...");
for (int i = 0; i < 2; i++) {
tier2Phaser.arriveAndAwaitAdvance();
System.out.println(workerName + " (Tier 2) processing batch " + i);
Thread.sleep(800);
System.out.println(workerName + " (Tier 2) completed batch " + i);
}
} catch (InterruptedException e) {
tier2Phaser.arriveAndDeregister();
}
}
public void startTier2() {
tier2Phaser.arriveAndAwaitAdvance();
}
}
static class MonitoringPhaser extends Phaser {
private final String name;
public MonitoringPhaser(int parties, String name) {
super(parties);
this.name = name;
}
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(name + " - Phase " + phase +
" advance. Registered: " + registeredParties);
return super.onAdvance(phase, registeredParties);
}
}
public static void main(String[] args) throws InterruptedException {
// Test 1: Multi-Phase Processing
System.out.println("=== Test 1: Multi-Phase Processing ===");
MultiPhaseProcessor processor = new MultiPhaseProcessor(3, 4);
for (int i = 1; i <= 3; i++) {
final String workerName = "Worker-" + i;
new Thread(() -> {
processor.processPhase(workerName);
}).start();
}
Thread.sleep(5000);
// Test 2: Dynamic Workforce
System.out.println("\n=== Test 2: Dynamic Workforce ===");
DynamicWorkforce workforce = new DynamicWorkforce();
// Start coordinator
new Thread(() -> {
workforce.coordinateWork("Coordinator");
}).start();
Thread.sleep(500);
// Add workers dynamically
for (int i = 1; i <= 3; i++) {
final String workerName = "Dynamic-Worker-" + i;
new Thread(() -> {
workforce.workerTask(workerName);
}).start();
Thread.sleep(300);
}
Thread.sleep(2000);
// Remove a worker
workforce.removeWorker("Dynamic-Worker-2");
Thread.sleep(3000);
// Test 3: Tiered Processing
System.out.println("\n=== Test 3: Tiered Processing ===");
TieredProcessing tiered = new TieredProcessing(2, 2);
for (int i = 1; i <= 2; i++) {
final String workerName = "T1-Worker-" + i;
new Thread(() -> {
tiered.tier1Worker(workerName);
}).start();
}
for (int i = 1; i <= 2; i++) {
final String workerName = "T2-Worker-" + i;
new Thread(() -> {
tiered.tier2Worker(workerName);
}).start();
}
// Start tier 2 after a delay
Thread.sleep(1500);
tiered.startTier2();
Thread.sleep(4000);
System.out.println("All Phaser tests completed");
}
}
Exchanger
Exchanger for Data Exchange
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ExchangerCommunication {
static class DataProducerConsumer {
private final Exchanger<String> exchanger;
private volatile boolean running = true;
public DataProducerConsumer() {
this.exchanger = new Exchanger<>();
}
public void producer(String producerName) {
try {
for (int i = 1; i <= 5; i++) {
String data = producerName + "-Data-" + i;
System.out.println(producerName + " produced: " + data);
// Exchange data with consumer
String response = exchanger.exchange(data);
System.out.println(producerName + " received response: " + response);
Thread.sleep(1000);
}
running = false;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void consumer(String consumerName) {
try {
while (running) {
// Wait for data from producer
String data = exchanger.exchange(null, 2, TimeUnit.SECONDS);
if (data != null) {
System.out.println(consumerName + " received: " + data);
// Process data and send response
String response = "Processed-" + data;
System.out.println(consumerName + " sending response: " + response);
exchanger.exchange(response);
}
}
} catch (InterruptedException | TimeoutException e) {
System.out.println(consumerName + " stopped: " + e.getClass().getSimpleName());
}
}
}
static class ResourceExchanger {
private final Exchanger<Integer> resourceExchanger;
public ResourceExchanger() {
this.resourceExchanger = new Exchanger<>();
}
public void nodeA(String nodeName) {
try {
int resource = 100; // Initial resource
for (int i = 0; i < 3; i++) {
System.out.println(nodeName + " has resource: " + resource);
// Exchange resource with node B
System.out.println(nodeName + " exchanging resource...");
resource = resourceExchanger.exchange(resource);
System.out.println(nodeName + " received resource: " + resource);
resource += 10; // Modify resource
Thread.sleep(1500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void nodeB(String nodeName) {
try {
int resource = 200; // Initial resource
for (int i = 0; i < 3; i++) {
System.out.println(nodeName + " has resource: " + resource);
// Exchange resource with node A
System.out.println(nodeName + " exchanging resource...");
resource = resourceExchanger.exchange(resource);
System.out.println(nodeName + " received resource: " + resource);
resource -= 5; // Modify resource
Thread.sleep(1200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class MultiPartyExchange {
private final Exchanger<String>[] exchangers;
@SuppressWarnings("unchecked")
public MultiPartyExchange(int numParties) {
exchangers = new Exchanger[numParties];
for (int i = 0; i < numParties; i++) {
exchangers[i] = new Exchanger<>();
}
}
public void participant(int id, String[] messages) {
try {
for (int round = 0; round < messages.length; round++) {
String messageToSend = messages[round];
System.out.println("Participant " + id + " sending: " + messageToSend);
// Exchange with next participant (circular)
int nextId = (id + 1) % exchangers.length;
String receivedMessage = exchangers[id].exchange(messageToSend);
System.out.println("Participant " + id + " received: " + receivedMessage);
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class BidirectionalCommunication {
private final Exchanger<Message> exchanger;
public BidirectionalCommunication() {
this.exchanger = new Exchanger<>();
}
static class Message {
final String content;
final int sequence;
Message(String content, int sequence) {
this.content = content;
this.sequence = sequence;
}
@Override
public String toString() {
return "Message{seq=" + sequence + ", content='" + content + "'}";
}
}
public void endpointA(String name) {
try {
for (int i = 1; i <= 4; i++) {
Message outbound = new Message("Hello from " + name, i);
System.out.println(name + " sending: " + outbound);
Message inbound = exchanger.exchange(outbound);
System.out.println(name + " received: " + inbound);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void endpointB(String name) {
try {
for (int i = 1; i <= 4; i++) {
Message outbound = new Message("Response from " + name, i);
System.out.println(name + " sending: " + outbound);
Message inbound = exchanger.exchange(outbound);
System.out.println(name + " received: " + inbound);
Thread.sleep(800);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
// Test 1: Producer-Consumer with Exchanger
System.out.println("=== Test 1: Producer-Consumer Exchange ===");
DataProducerConsumer pc = new DataProducerConsumer();
Thread producer = new Thread(() -> {
pc.producer("Producer");
});
Thread consumer = new Thread(() -> {
pc.consumer("Consumer");
});
producer.start();
consumer.start();
producer.join();
consumer.join();
// Test 2: Resource Exchange
System.out.println("\n=== Test 2: Resource Exchange ===");
ResourceExchanger resourceEx = new ResourceExchanger();
Thread nodeA = new Thread(() -> {
resourceEx.nodeA("Node-A");
});
Thread nodeB = new Thread(() -> {
resourceEx.nodeB("Node-B");
});
nodeA.start();
nodeB.start();
nodeA.join();
nodeB.join();
// Test 3: Multi-party Exchange
System.out.println("\n=== Test 3: Multi-party Exchange ===");
MultiPartyExchange multiExchange = new MultiPartyExchange(3);
String[][] messages = {
{"Hello from 0", "Second from 0", "Third from 0"},
{"Hi from 1", "Second from 1", "Third from 1"},
{"Hey from 2", "Second from 2", "Third from 2"}
};
Thread[] participants = new Thread[3];
for (int i = 0; i < 3; i++) {
final int participantId = i;
participants[i] = new Thread(() -> {
multiExchange.participant(participantId, messages[participantId]);
});
participants[i].start();
}
for (Thread t : participants) {
t.join();
}
// Test 4: Bidirectional Communication
System.out.println("\n=== Test 4: Bidirectional Communication ===");
BidirectionalCommunication bidirectional = new BidirectionalCommunication();
Thread endpoint1 = new Thread(() -> {
bidirectional.endpointA("Endpoint-A");
});
Thread endpoint2 = new Thread(() -> {
bidirectional.endpointB("Endpoint-B");
});
endpoint1.start();
endpoint2.start();
endpoint1.join();
endpoint2.join();
System.out.println("All Exchanger tests completed");
}
}
Semaphore
Semaphore for Resource Management
```java
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreCommunication {
static class ResourcePool {
private final Semaphore semaphore;
private final boolean[] resources;
private final int poolSize;
public ResourcePool(int poolSize) {
this.poolSize = poolSize;
this.semaphore = new Semaphore(poolSize, true); // Fair semaphore
this.resources = new boolean[poolSize];
// Initialize all resources as available
for (int i = 0; i < poolSize; i++) {
resources[i] = true;
}
}
public int acquireResource() throws InterruptedException {
semaphore.acquire();
return getNextAvailableResource();
}
public boolean tryAcquireResource(long timeout, TimeUnit unit) throws InterruptedException {
if (semaphore.tryAcquire(timeout, unit)) {
return getNextAvailableResource() != -1;
}
return false;
}
public void releaseResource(int resourceId) {
if (markAsAvailable(resourceId)) {
semaphore.release();
}
}
private synchronized int getNextAvailableResource() {
for (int i = 0; i < poolSize; i++) {
if (resources[i]) {
resources[i] = false;
System.out.println(Thread.currentThread().getName() + " acquired resource " + i);
return i;
}
}
return -1; // Should not happen with proper semaphore usage
}
private synchronized boolean markAsAvailable(int resourceId) {
if (!resources[resourceId]) {
resources[resourceId] = true;
System.out.println(Thread.currentThread().getName() + " released resource " + resourceId);
return true;
}
return false;
}
public int availablePermits() {
return semaphore.availablePermits();
}
}
static class RateLimiter {
private final Semaphore semaphore;
private final int maxPermits;
public RateLimiter(int maxPermits) {
this.maxPermits = maxPermits;
this.semaphore = new Semaphore(maxPermits);
// Refill permits periodically
new Thread(() -> {
while (true) {
try {
Thread.sleep(1000); // Refill every second
refillPermits();
} catch (InterruptedException e) {
break;
}
}
}).start();
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public void acquire() throws InterruptedException {
semaphore.acquire();
}
private synchronized void refillPermits() {
int currentPermits = semaphore.availablePermits();
if (currentPermits < maxPermits) {
int refillAmount = maxPermits - currentPermits;
semaphore.release(refillAmount);
System.out.println("Refilled " + refillAmount + " permits. Total: " +
semaphore.availablePermits());
}
}
public int getAvailablePermits() {
return semaphore.availablePermits();
}
}
static class BoundedBuffer<T> {
private final Semaphore emptySlots;
private final Semaphore fullSlots;
private final Semaphore mutex;
private final T[] buffer;
private int putIndex = 0;
private int takeIndex = 0;
private int count = 0;
@SuppressWarnings("unchecked")
public BoundedBuffer(int capacity) {
this.buffer = (T[]) new Object[capacity];
this.emptySlots = new Semaphore(capacity);
this.fullSlots = new Semaphore(0);
this.mutex = new Semaphore(1); // Binary semaphore as mutex
}
public void put(T item) throws InterruptedException {
emptySlots.acquire(); // Wait for empty slot
mutex.acquire(); // Enter critical section
try {
buffer[putIndex] = item;
putIndex = (putIndex + 1) % buffer.length;
count++;
System.out.println(Thread.currentThread().getName() + " put: " + item +
" | Count: " + count);
} finally {
mutex.release(); // Exit critical section
fullSlots.release(); //