Inter-Thread Communication in Java: Complete Guide

Table of Contents

  1. Introduction to Inter-Thread Communication
  2. wait(), notify(), notifyAll()
  3. Producer-Consumer Pattern
  4. BlockingQueue Implementation
  5. Synchronized Methods and Blocks
  6. Lock and Condition
  7. Volatile Keyword
  8. Atomic Variables
  9. CountDownLatch
  10. CyclicBarrier
  11. Phaser
  12. Exchanger
  13. Semaphore

Introduction to Inter-Thread Communication

Inter-thread communication is the mechanism that allows synchronized threads to communicate with each other. It's essential for coordinating activities between threads and avoiding thread interference.

Why Inter-Thread Communication?

  • Coordination: Threads need to coordinate their activities
  • Resource Sharing: Safe sharing of resources between threads
  • Efficiency: Avoid busy waiting and reduce CPU cycles
  • Synchronization: Ensure proper ordering of operations

Basic Communication Problem

public class BasicCommunicationProblem {
private static String message;
private static boolean isMessageReady = false;
public static void main(String[] args) {
// Producer thread
Thread producer = new Thread(() -> {
try {
Thread.sleep(2000); // Simulate work
message = "Hello from Producer!";
isMessageReady = true;
System.out.println("Producer: Message produced");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// Consumer thread - PROBLEM: Busy waiting!
Thread consumer = new Thread(() -> {
while (!isMessageReady) {
// Busy waiting - wastes CPU cycles
// System.out.println("Consumer: Waiting for message...");
}
System.out.println("Consumer: Received - " + message);
});
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

wait(), notify(), notifyAll()

Basic wait() and notify() Usage

public class WaitNotifyBasics {
private static final Object lock = new Object();
private static boolean condition = false;
public static void main(String[] args) {
Thread waitingThread = new Thread(() -> {
synchronized (lock) {
System.out.println("Waiting thread: Acquired lock, checking condition");
while (!condition) {
try {
System.out.println("Waiting thread: Condition false, going to wait");
lock.wait(); // Releases lock and waits
System.out.println("Waiting thread: Woke up, checking condition again");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Waiting thread: Condition is true, proceeding");
}
});
Thread notifyingThread = new Thread(() -> {
try {
Thread.sleep(2000); // Simulate work
synchronized (lock) {
System.out.println("Notifying thread: Acquired lock, changing condition");
condition = true;
lock.notify(); // Wakes up one waiting thread
System.out.println("Notifying thread: Notified waiting thread");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
waitingThread.start();
notifyingThread.start();
try {
waitingThread.join();
notifyingThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

wait() and notifyAll() Example

public class WaitNotifyAllExample {
private static final Object lock = new Object();
private static int availableResources = 0;
static class ResourceConsumer extends Thread {
private final int id;
public ResourceConsumer(int id) {
this.id = id;
setName("Consumer-" + id);
}
@Override
public void run() {
synchronized (lock) {
System.out.println(getName() + ": Waiting for resource");
while (availableResources == 0) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
availableResources--;
System.out.println(getName() + ": Got resource. Remaining: " + availableResources);
}
}
}
static class ResourceProducer extends Thread {
@Override
public void run() {
try {
Thread.sleep(2000); // Simulate production time
synchronized (lock) {
availableResources = 3;
System.out.println("Producer: Produced 3 resources");
lock.notifyAll(); // Notify all waiting threads
System.out.println("Producer: Notified all waiting consumers");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// Create multiple consumers
for (int i = 1; i <= 5; i++) {
new ResourceConsumer(i).start();
}
// Create producer
new ResourceProducer().start();
}
}

Common wait()/notify() Patterns

public class WaitNotifyPatterns {
// Pattern 1: Guarded Block
static class GuardedBlock {
private boolean condition = false;
private final Object lock = new Object();
public void doWhenCondition() throws InterruptedException {
synchronized (lock) {
while (!condition) {
lock.wait();
}
// Perform action when condition is true
System.out.println("Condition met, performing action");
}
}
public void makeConditionTrue() {
synchronized (lock) {
condition = true;
lock.notifyAll();
}
}
}
// Pattern 2: Timeout wait
static class TimeoutWait {
private final Object lock = new Object();
private boolean ready = false;
public void waitWithTimeout(long timeoutMs) throws InterruptedException {
synchronized (lock) {
long endTime = System.currentTimeMillis() + timeoutMs;
long remaining = timeoutMs;
while (!ready && remaining > 0) {
lock.wait(remaining);
remaining = endTime - System.currentTimeMillis();
}
if (ready) {
System.out.println("Condition met within timeout");
} else {
System.out.println("Timeout occurred");
}
}
}
public void setReady() {
synchronized (lock) {
ready = true;
lock.notify();
}
}
}
// Pattern 3: Multiple conditions
static class MultipleConditions {
private final Object lock = new Object();
private boolean condition1 = false;
private boolean condition2 = false;
public void waitForCondition1() throws InterruptedException {
synchronized (lock) {
while (!condition1) {
lock.wait();
}
System.out.println("Condition 1 met");
}
}
public void waitForCondition2() throws InterruptedException {
synchronized (lock) {
while (!condition2) {
lock.wait();
}
System.out.println("Condition 2 met");
}
}
public void setCondition1() {
synchronized (lock) {
condition1 = true;
lock.notifyAll(); // Notify all as multiple conditions might be waiting
}
}
public void setCondition2() {
synchronized (lock) {
condition2 = true;
lock.notifyAll();
}
}
}
public static void main(String[] args) throws InterruptedException {
// Test Guarded Block
GuardedBlock guardedBlock = new GuardedBlock();
new Thread(() -> {
try {
guardedBlock.doWhenCondition();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(1000);
guardedBlock.makeConditionTrue();
// Test Timeout Wait
TimeoutWait timeoutWait = new TimeoutWait();
new Thread(() -> {
try {
timeoutWait.waitWithTimeout(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// Don't set ready - will timeout
Thread.sleep(4000);
}
}

Producer-Consumer Pattern

Basic Producer-Consumer with wait/notify

public class ProducerConsumerWaitNotify {
private static final int BUFFER_SIZE = 5;
private static final int[] buffer = new int[BUFFER_SIZE];
private static int count = 0;
private static int putIndex = 0;
private static int takeIndex = 0;
private static final Object lock = new Object();
static class Producer extends Thread {
private final int id;
public Producer(int id) {
this.id = id;
setName("Producer-" + id);
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
produce(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void produce(int item) throws InterruptedException {
synchronized (lock) {
while (count == BUFFER_SIZE) {
System.out.println(getName() + ": Buffer full, waiting...");
lock.wait();
}
buffer[putIndex] = item;
putIndex = (putIndex + 1) % BUFFER_SIZE;
count++;
System.out.println(getName() + ": Produced " + item + ", count: " + count);
lock.notifyAll(); // Notify consumers
}
}
}
static class Consumer extends Thread {
private final int id;
public Consumer(int id) {
this.id = id;
setName("Consumer-" + id);
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void consume() throws InterruptedException {
synchronized (lock) {
while (count == 0) {
System.out.println(getName() + ": Buffer empty, waiting...");
lock.wait();
}
int item = buffer[takeIndex];
takeIndex = (takeIndex + 1) % BUFFER_SIZE;
count--;
System.out.println(getName() + ": Consumed " + item + ", count: " + count);
lock.notifyAll(); // Notify producers
}
}
}
public static void main(String[] args) {
Producer p1 = new Producer(1);
Producer p2 = new Producer(2);
Consumer c1 = new Consumer(1);
Consumer c2 = new Consumer(2);
p1.start();
p2.start();
c1.start();
c2.start();
try {
p1.join();
p2.join();
c1.join();
c2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Producer-Consumer completed");
}
}

Advanced Producer-Consumer with Multiple Conditions

public class AdvancedProducerConsumer {
private static final int BUFFER_SIZE = 3;
private final int[] buffer = new int[BUFFER_SIZE];
private int count = 0;
private int putIndex = 0;
private int takeIndex = 0;
private final Object lock = new Object();
public void produce(int item, String producerName) throws InterruptedException {
synchronized (lock) {
// Wait while buffer is full
while (count == BUFFER_SIZE) {
System.out.println(producerName + ": Buffer FULL - waiting...");
lock.wait();
}
// Produce item
buffer[putIndex] = item;
putIndex = (putIndex + 1) % BUFFER_SIZE;
count++;
System.out.println(producerName + ": Produced " + item + 
" | Buffer: [" + getBufferContents() + "]");
// Notify all waiting threads (both producers and consumers)
lock.notifyAll();
}
}
public int consume(String consumerName) throws InterruptedException {
synchronized (lock) {
// Wait while buffer is empty
while (count == 0) {
System.out.println(consumerName + ": Buffer EMPTY - waiting...");
lock.wait();
}
// Consume item
int item = buffer[takeIndex];
takeIndex = (takeIndex + 1) % BUFFER_SIZE;
count--;
System.out.println(consumerName + ": Consumed " + item + 
" | Buffer: [" + getBufferContents() + "]");
// Notify all waiting threads
lock.notifyAll();
return item;
}
}
private String getBufferContents() {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < count; i++) {
int index = (takeIndex + i) % BUFFER_SIZE;
sb.append(buffer[index]);
if (i < count - 1) {
sb.append(", ");
}
}
return sb.toString();
}
public static void main(String[] args) {
AdvancedProducerConsumer pc = new AdvancedProducerConsumer();
// Create multiple producers
for (int i = 1; i <= 2; i++) {
final int producerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 5; j++) {
pc.produce(producerId * 100 + j, "Producer-" + producerId);
Thread.sleep((long) (Math.random() * 1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// Create multiple consumers
for (int i = 1; i <= 3; i++) {
final int consumerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 4; j++) {
pc.consume("Consumer-" + consumerId);
Thread.sleep((long) (Math.random() * 1500));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}

BlockingQueue Implementation

Custom BlockingQueue Implementation

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
public class CustomBlockingQueue<T> {
private final Queue<T> queue;
private final int capacity;
private final Object lock = new Object();
public CustomBlockingQueue(int capacity) {
this.queue = new LinkedList<>();
this.capacity = capacity;
}
public void put(T item) throws InterruptedException {
synchronized (lock) {
while (queue.size() == capacity) {
System.out.println(Thread.currentThread().getName() + ": Queue full, waiting...");
lock.wait();
}
queue.offer(item);
System.out.println(Thread.currentThread().getName() + ": Added " + item + 
" | Size: " + queue.size());
lock.notifyAll();
}
}
public T take() throws InterruptedException {
synchronized (lock) {
while (queue.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": Queue empty, waiting...");
lock.wait();
}
T item = queue.poll();
System.out.println(Thread.currentThread().getName() + ": Removed " + item + 
" | Size: " + queue.size());
lock.notifyAll();
return item;
}
}
public boolean offer(T item, long timeout, TimeUnit unit) throws InterruptedException {
synchronized (lock) {
long nanosToWait = unit.toNanos(timeout);
long endTime = System.nanoTime() + nanosToWait;
while (queue.size() == capacity) {
if (nanosToWait <= 0) {
return false; // Timeout
}
lock.wait(nanosToWait / 1000000, (int) (nanosToWait % 1000000));
nanosToWait = endTime - System.nanoTime();
}
queue.offer(item);
lock.notifyAll();
return true;
}
}
public int size() {
synchronized (lock) {
return queue.size();
}
}
public boolean isEmpty() {
synchronized (lock) {
return queue.isEmpty();
}
}
}
// Test the Custom BlockingQueue
class BlockingQueueTest {
public static void main(String[] args) {
CustomBlockingQueue<Integer> queue = new CustomBlockingQueue<>(3);
// Producer thread
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.put(i);
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Producer");
// Consumer thread
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.take();
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer");
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("BlockingQueue test completed");
}
}

Synchronized Methods and Blocks

Synchronized Communication

public class SynchronizedCommunication {
static class SharedResource {
private int value = 0;
private boolean hasValue = false;
// Synchronized method for producer
public synchronized void produce(int newValue) throws InterruptedException {
while (hasValue) {
System.out.println(Thread.currentThread().getName() + ": Resource has value, waiting...");
wait();
}
value = newValue;
hasValue = true;
System.out.println(Thread.currentThread().getName() + ": Produced " + value);
notifyAll(); // Notify all waiting consumers
}
// Synchronized method for consumer
public synchronized int consume() throws InterruptedException {
while (!hasValue) {
System.out.println(Thread.currentThread().getName() + ": No value available, waiting...");
wait();
}
hasValue = false;
System.out.println(Thread.currentThread().getName() + ": Consumed " + value);
notifyAll(); // Notify all waiting producers
return value;
}
// Synchronized block with specific lock object
private final Object readLock = new Object();
private final Object writeLock = new Object();
private int readers = 0;
public void readOperation() {
synchronized (readLock) {
readers++;
System.out.println(Thread.currentThread().getName() + ": Reading, readers: " + readers);
}
// Simulate read operation
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (readLock) {
readers--;
System.out.println(Thread.currentThread().getName() + ": Finished reading, readers: " + readers);
if (readers == 0) {
readLock.notifyAll(); // Notify waiting writers
}
}
}
public void writeOperation() throws InterruptedException {
synchronized (readLock) {
while (readers > 0) {
System.out.println(Thread.currentThread().getName() + ": Waiting for readers to finish");
readLock.wait();
}
}
synchronized (writeLock) {
System.out.println(Thread.currentThread().getName() + ": Writing");
// Simulate write operation
Thread.sleep(200);
System.out.println(Thread.currentThread().getName() + ": Finished writing");
}
}
}
public static void main(String[] args) {
SharedResource resource = new SharedResource();
// Test producer-consumer with synchronized methods
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
resource.produce(i);
Thread.sleep(300);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Producer");
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
resource.consume();
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer");
producer.start();
consumer.start();
// Test reader-writer pattern
for (int i = 1; i <= 3; i++) {
final int readerId = i;
new Thread(() -> {
for (int j = 0; j < 3; j++) {
resource.readOperation();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Reader-" + readerId).start();
}
for (int i = 1; i <= 2; i++) {
final int writerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 2; j++) {
resource.writeOperation();
Thread.sleep(300);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Writer-" + writerId).start();
}
try {
producer.join();
consumer.join();
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Synchronized communication test completed");
}
}

Lock and Condition

Using Lock and Condition for Communication

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockConditionCommunication {
static class MessageQueue {
private final String[] messages;
private int putIndex = 0;
private int takeIndex = 0;
private int count = 0;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public MessageQueue(int capacity) {
messages = new String[capacity];
}
public void put(String message) throws InterruptedException {
lock.lock();
try {
while (count == messages.length) {
System.out.println(Thread.currentThread().getName() + ": Queue full, waiting...");
notFull.await();
}
messages[putIndex] = message;
putIndex = (putIndex + 1) % messages.length;
count++;
System.out.println(Thread.currentThread().getName() + ": Added '" + message + 
"' | Size: " + count);
notEmpty.signal(); // Signal waiting consumers
} finally {
lock.unlock();
}
}
public String take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
System.out.println(Thread.currentThread().getName() + ": Queue empty, waiting...");
notEmpty.await();
}
String message = messages[takeIndex];
takeIndex = (takeIndex + 1) % messages.length;
count--;
System.out.println(Thread.currentThread().getName() + ": Removed '" + message + 
"' | Size: " + count);
notFull.signal(); // Signal waiting producers
return message;
} finally {
lock.unlock();
}
}
public boolean tryPut(String message, long timeoutMs) throws InterruptedException {
lock.lock();
try {
long endTime = System.currentTimeMillis() + timeoutMs;
long remaining = timeoutMs;
while (count == messages.length && remaining > 0) {
notFull.await(remaining, java.util.concurrent.TimeUnit.MILLISECONDS);
remaining = endTime - System.currentTimeMillis();
}
if (count == messages.length) {
return false; // Timeout
}
messages[putIndex] = message;
putIndex = (putIndex + 1) % messages.length;
count++;
notEmpty.signal();
return true;
} finally {
lock.unlock();
}
}
}
static class AdvancedResource {
private final Lock lock = new ReentrantLock();
private final Condition dataAvailable = lock.newCondition();
private final Condition spaceAvailable = lock.newCondition();
private final Condition operationComplete = lock.newCondition();
private String data = null;
private boolean completed = false;
public void produceData(String newData) throws InterruptedException {
lock.lock();
try {
while (data != null) {
System.out.println(Thread.currentThread().getName() + ": Data exists, waiting for space");
spaceAvailable.await();
}
data = newData;
System.out.println(Thread.currentThread().getName() + ": Produced data: " + data);
dataAvailable.signal(); // Signal consumers
} finally {
lock.unlock();
}
}
public String consumeData() throws InterruptedException {
lock.lock();
try {
while (data == null && !completed) {
System.out.println(Thread.currentThread().getName() + ": No data, waiting...");
dataAvailable.await();
}
if (completed && data == null) {
return null; // No more data
}
String consumedData = data;
data = null;
System.out.println(Thread.currentThread().getName() + ": Consumed data: " + consumedData);
spaceAvailable.signal(); // Signal producers
return consumedData;
} finally {
lock.unlock();
}
}
public void markCompleted() {
lock.lock();
try {
completed = true;
dataAvailable.signalAll(); // Wake up all waiting consumers
} finally {
lock.unlock();
}
}
public void waitForCompletion() throws InterruptedException {
lock.lock();
try {
while (!completed) {
operationComplete.await();
}
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
// Test MessageQueue with Lock and Condition
MessageQueue queue = new MessageQueue(3);
Thread producer1 = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
queue.put("Message-" + i + "-from-P1");
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Producer-1");
Thread producer2 = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
boolean success = queue.tryPut("Message-" + i + "-from-P2", 1000);
if (!success) {
System.out.println("Producer-2: Failed to put message (timeout)");
}
Thread.sleep(300);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Producer-2");
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.take();
Thread.sleep(400);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer");
producer1.start();
producer2.start();
consumer.start();
try {
producer1.join();
producer2.join();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Lock and Condition test completed");
}
}

Volatile Keyword

Volatile for Visibility

public class VolatileCommunication {
static class SharedFlag {
private volatile boolean flag = false;
private int counter = 0;
public void setFlag() {
flag = true;
System.out.println(Thread.currentThread().getName() + ": Flag set to true");
}
public void waitForFlag() {
while (!flag) {
// Busy wait - but with volatile, changes are visible
counter++; // Some work to prevent excessive CPU usage
if (counter % 1000000 == 0) {
System.out.println(Thread.currentThread().getName() + ": Still waiting...");
}
}
System.out.println(Thread.currentThread().getName() + ": Flag is true, proceeding");
}
public boolean getFlag() {
return flag;
}
}
static class VolatileCounter {
private volatile int count = 0;
// Volatile ensures visibility but not atomicity
public void increment() {
count++; // This is not atomic!
}
public int getCount() {
return count;
}
}
static class DataProcessor {
private volatile boolean shutdownRequested = false;
private volatile String latestData = null;
public void processData() {
while (!shutdownRequested) {
if (latestData != null) {
String data = latestData;
latestData = null; // Clear after processing
System.out.println(Thread.currentThread().getName() + ": Processing: " + data);
}
// Small sleep to prevent busy waiting
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println(Thread.currentThread().getName() + ": Shutdown completed");
}
public void submitData(String data) {
latestData = data;
System.out.println(Thread.currentThread().getName() + ": Submitted: " + data);
}
public void requestShutdown() {
shutdownRequested = true;
System.out.println("Shutdown requested");
}
}
public static void main(String[] args) throws InterruptedException {
// Test 1: Basic volatile flag
System.out.println("=== Test 1: Volatile Flag ===");
SharedFlag sharedFlag = new SharedFlag();
Thread waiter = new Thread(() -> {
sharedFlag.waitForFlag();
}, "Waiter");
Thread setter = new Thread(() -> {
try {
Thread.sleep(2000);
sharedFlag.setFlag();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Setter");
waiter.start();
setter.start();
waiter.join();
setter.join();
// Test 2: Volatile counter (demonstrates limitation)
System.out.println("\n=== Test 2: Volatile Counter (Non-atomic) ===");
VolatileCounter counter = new VolatileCounter();
Thread[] incrementers = new Thread[10];
for (int i = 0; i < incrementers.length; i++) {
incrementers[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
});
incrementers[i].start();
}
for (Thread t : incrementers) {
t.join();
}
System.out.println("Expected count: 10000");
System.out.println("Actual count: " + counter.getCount());
System.out.println("Volatile ensures visibility but not atomicity!");
// Test 3: Data processor with volatile
System.out.println("\n=== Test 3: Data Processor ===");
DataProcessor processor = new DataProcessor();
Thread processorThread = new Thread(() -> {
processor.processData();
}, "Processor");
processorThread.start();
// Submit some data
for (int i = 1; i <= 5; i++) {
processor.submitData("Data-" + i);
Thread.sleep(500);
}
// Request shutdown
processor.requestShutdown();
processorThread.join();
System.out.println("All volatile tests completed");
}
}

Atomic Variables

Atomic Variables for Lock-Free Communication

import java.util.concurrent.atomic.*;
public class AtomicCommunication {
static class AtomicCounter {
private final AtomicInteger count = new AtomicInteger(0);
private final AtomicLong total = new AtomicLong(0);
private final AtomicReference<String> latestMessage = new AtomicReference<>("Initial");
public void increment() {
count.incrementAndGet();
}
public void addToTotal(int value) {
total.addAndGet(value);
}
public void setMessage(String message) {
latestMessage.set(message);
}
public boolean compareAndSetMessage(String expected, String newMessage) {
return latestMessage.compareAndSet(expected, newMessage);
}
public void printState() {
System.out.println("Count: " + count.get() + 
", Total: " + total.get() + 
", Message: " + latestMessage.get());
}
}
static class AtomicCoordinator {
private final AtomicInteger activeWorkers = new AtomicInteger(0);
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
private final AtomicReferenceArray<String> workerStatus;
public AtomicCoordinator(int numWorkers) {
workerStatus = new AtomicReferenceArray<>(numWorkers);
for (int i = 0; i < numWorkers; i++) {
workerStatus.set(i, "Initialized");
}
}
public void workerStarted(int workerId) {
activeWorkers.incrementAndGet();
workerStatus.set(workerId, "Running");
System.out.println("Worker " + workerId + " started. Active: " + activeWorkers.get());
}
public void workerCompleted(int workerId) {
int remaining = activeWorkers.decrementAndGet();
workerStatus.set(workerId, "Completed");
System.out.println("Worker " + workerId + " completed. Remaining: " + remaining);
}
public void requestShutdown() {
shutdownRequested.set(true);
System.out.println("Shutdown requested");
}
public boolean shouldShutdown() {
return shutdownRequested.get();
}
public void printStatus() {
System.out.print("Worker status: [");
for (int i = 0; i < workerStatus.length(); i++) {
System.out.print(workerStatus.get(i));
if (i < workerStatus.length() - 1) {
System.out.print(", ");
}
}
System.out.println("]");
}
}
static class LockFreeStack<T> {
private static class Node<T> {
final T value;
Node<T> next;
Node(T value) {
this.value = value;
}
}
private final AtomicReference<Node<T>> top = new AtomicReference<>();
public void push(T value) {
Node<T> newHead = new Node<>(value);
Node<T> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
System.out.println(Thread.currentThread().getName() + ": Pushed " + value);
}
public T pop() {
Node<T> oldHead;
Node<T> newHead;
do {
oldHead = top.get();
if (oldHead == null) {
return null;
}
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
System.out.println(Thread.currentThread().getName() + ": Popped " + oldHead.value);
return oldHead.value;
}
public boolean isEmpty() {
return top.get() == null;
}
}
public static void main(String[] args) throws InterruptedException {
// Test 1: Atomic Counter
System.out.println("=== Test 1: Atomic Counter ===");
AtomicCounter counter = new AtomicCounter();
Thread[] counterThreads = new Thread[5];
for (int i = 0; i < counterThreads.length; i++) {
counterThreads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
counter.addToTotal(j);
}
});
counterThreads[i].start();
}
for (Thread t : counterThreads) {
t.join();
}
counter.printState();
// Test 2: Atomic Coordinator
System.out.println("\n=== Test 2: Atomic Coordinator ===");
AtomicCoordinator coordinator = new AtomicCoordinator(3);
for (int i = 0; i < 3; i++) {
final int workerId = i;
new Thread(() -> {
coordinator.workerStarted(workerId);
try {
Thread.sleep(1000 + workerId * 500);
coordinator.workerCompleted(workerId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(2000);
coordinator.printStatus();
// Test 3: Lock-Free Stack
System.out.println("\n=== Test 3: Lock-Free Stack ===");
LockFreeStack<Integer> stack = new LockFreeStack<>();
Thread pusher1 = new Thread(() -> {
for (int i = 1; i <= 5; i++) {
stack.push(i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Pusher-1");
Thread pusher2 = new Thread(() -> {
for (int i = 6; i <= 10; i++) {
stack.push(i);
try {
Thread.sleep(150);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Pusher-2");
Thread popper = new Thread(() -> {
for (int i = 1; i <= 10; i++) {
Integer value = stack.pop();
if (value != null) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "Popper");
pusher1.start();
pusher2.start();
popper.start();
pusher1.join();
pusher2.join();
popper.join();
System.out.println("Stack empty: " + stack.isEmpty());
System.out.println("All atomic tests completed");
}
}

CountDownLatch

CountDownLatch for Coordination

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatchCommunication {
static class ServiceInitializer {
private final CountDownLatch initializationLatch;
private final CountDownLatch shutdownLatch;
private volatile boolean initialized = false;
public ServiceInitializer(int numServices) {
this.initializationLatch = new CountDownLatch(numServices);
this.shutdownLatch = new CountDownLatch(numServices);
}
public void serviceReady(String serviceName) {
initializationLatch.countDown();
System.out.println(serviceName + " is ready. Remaining: " + initializationLatch.getCount());
}
public void serviceStopped(String serviceName) {
shutdownLatch.countDown();
System.out.println(serviceName + " stopped. Remaining: " + shutdownLatch.getCount());
}
public void waitForInitialization() throws InterruptedException {
System.out.println("Waiting for all services to initialize...");
initializationLatch.await();
initialized = true;
System.out.println("All services initialized! Starting main operation.");
}
public void waitForShutdown() throws InterruptedException {
System.out.println("Waiting for all services to shutdown...");
shutdownLatch.await();
System.out.println("All services shutdown completed.");
}
public boolean isInitialized() {
return initialized;
}
}
static class ParallelProcessor {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
private final int numWorkers;
public ParallelProcessor(int numWorkers) {
this.numWorkers = numWorkers;
this.startSignal = new CountDownLatch(1);
this.doneSignal = new CountDownLatch(numWorkers);
}
public void startProcessing() {
System.out.println("Starting all workers...");
startSignal.countDown(); // Release all workers
}
public void waitForCompletion() throws InterruptedException {
doneSignal.await(); // Wait for all workers to complete
System.out.println("All workers completed!");
}
public void waitForCompletion(long timeout, TimeUnit unit) throws InterruptedException {
boolean completed = doneSignal.await(timeout, unit);
if (completed) {
System.out.println("All workers completed within timeout!");
} else {
System.out.println("Timeout! Some workers still running. Completed: " + 
(numWorkers - doneSignal.getCount()) + "/" + numWorkers);
}
}
public void workerReady(int workerId) throws InterruptedException {
System.out.println("Worker " + workerId + " waiting for start signal...");
startSignal.await(); // Wait for start signal
System.out.println("Worker " + workerId + " started processing");
// Simulate work
Thread.sleep(1000 + workerId * 200);
System.out.println("Worker " + workerId + " completed");
doneSignal.countDown();
}
}
static class RaceCoordinator {
private final CountDownLatch readySignal;
private final CountDownLatch startSignal;
private final CountDownLatch finishSignal;
public RaceCoordinator(int numRacers) {
this.readySignal = new CountDownLatch(numRacers);
this.startSignal = new CountDownLatch(1);
this.finishSignal = new CountDownLatch(numRacers);
}
public void racerReady(String racerName) {
readySignal.countDown();
System.out.println(racerName + " is ready. Waiting: " + readySignal.getCount());
}
public void startRace() throws InterruptedException {
System.out.println("Waiting for all racers to be ready...");
readySignal.await();
System.out.println("All racers ready! Starting race in 3 seconds...");
Thread.sleep(3000);
System.out.println("GO!!!");
startSignal.countDown();
}
public void waitForRacer(String racerName) throws InterruptedException {
startSignal.await(); // Wait for start signal
System.out.println(racerName + " is running...");
// Simulate race time
long raceTime = 2000 + (long) (Math.random() * 3000);
Thread.sleep(raceTime);
System.out.println(racerName + " finished in " + raceTime + "ms");
finishSignal.countDown();
}
public void waitForRaceFinish() throws InterruptedException {
finishSignal.await();
System.out.println("Race completed! All racers finished.");
}
}
public static void main(String[] args) throws InterruptedException {
// Test 1: Service Initialization
System.out.println("=== Test 1: Service Initialization ===");
ServiceInitializer initializer = new ServiceInitializer(3);
for (int i = 1; i <= 3; i++) {
final int serviceId = i;
new Thread(() -> {
try {
Thread.sleep(serviceId * 1000); // Simulate initialization time
initializer.serviceReady("Service-" + serviceId);
// Simulate service running
Thread.sleep(2000);
initializer.serviceStopped("Service-" + serviceId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
initializer.waitForInitialization();
System.out.println("Main operation running...");
Thread.sleep(1000);
initializer.waitForShutdown();
// Test 2: Parallel Processing
System.out.println("\n=== Test 2: Parallel Processing ===");
ParallelProcessor processor = new ParallelProcessor(4);
for (int i = 1; i <= 4; i++) {
final int workerId = i;
new Thread(() -> {
try {
processor.workerReady(workerId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(1000); // Let workers get ready
processor.startProcessing();
processor.waitForCompletion();
// Test 3: Race Coordination
System.out.println("\n=== Test 3: Race Coordination ===");
RaceCoordinator race = new RaceCoordinator(4);
for (int i = 1; i <= 4; i++) {
final int racerId = i;
new Thread(() -> {
try {
race.racerReady("Racer-" + racerId);
race.waitForRacer("Racer-" + racerId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
race.startRace();
race.waitForRaceFinish();
System.out.println("All CountDownLatch tests completed");
}
}

CyclicBarrier

CyclicBarrier for Synchronization

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class CyclicBarrierCommunication {
static class MatrixProcessor {
private final CyclicBarrier barrier;
private final int[][] matrix;
private final int numWorkers;
public MatrixProcessor(int[][] matrix, int numWorkers) {
this.matrix = matrix;
this.numWorkers = numWorkers;
this.barrier = new CyclicBarrier(numWorkers, () -> {
System.out.println("All workers completed current phase! Starting next phase...");
});
}
public void processRow(int workerId, int startRow, int endRow) {
try {
for (int phase = 1; phase <= 3; phase++) {
System.out.println("Worker " + workerId + " starting phase " + phase + 
" (rows " + startRow + "-" + endRow + ")");
// Simulate processing
Thread.sleep(1000 + workerId * 100);
System.out.println("Worker " + workerId + " completed phase " + phase + 
", waiting at barrier...");
// Wait for all workers to complete this phase
barrier.await();
System.out.println("Worker " + workerId + " passed barrier for phase " + phase);
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
static class TournamentOrganizer {
private final CyclicBarrier roundBarrier;
private final int numPlayers;
private int currentRound = 1;
public TournamentOrganizer(int numPlayers) {
this.numPlayers = numPlayers;
this.roundBarrier = new CyclicBarrier(numPlayers, () -> {
System.out.println("\n=== Round " + currentRound + " completed ===");
currentRound++;
});
}
public void playRound(String playerName) {
try {
System.out.println(playerName + " preparing for round " + currentRound);
Thread.sleep(500 + (long) (Math.random() * 1000));
System.out.println(playerName + " completed round " + currentRound + 
", waiting for others...");
roundBarrier.await(5, TimeUnit.SECONDS); // Timeout to prevent deadlock
System.out.println(playerName + " advancing to next round");
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
System.out.println(playerName + " eliminated: " + e.getClass().getSimpleName());
}
}
}
static class DataAggregator {
private final CyclicBarrier aggregationBarrier;
private final double[] partialResults;
private double finalResult = 0;
public DataAggregator(int numWorkers) {
this.partialResults = new double[numWorkers];
this.aggregationBarrier = new CyclicBarrier(numWorkers, () -> {
// This runs when all threads reach the barrier
aggregateResults();
});
}
public void processData(int workerId, double[] dataChunk) {
try {
// Phase 1: Process data chunk
double partialSum = 0;
for (double value : dataChunk) {
partialSum += value;
}
partialResults[workerId] = partialSum;
System.out.println("Worker " + workerId + " computed partial sum: " + partialSum);
aggregationBarrier.await(); // Wait for all workers
// Phase 2: All workers can now use the final result
System.out.println("Worker " + workerId + " using final result: " + finalResult);
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
private void aggregateResults() {
finalResult = 0;
for (double result : partialResults) {
finalResult += result;
}
System.out.println("Aggregated final result: " + finalResult);
}
}
static class ResilientBarrier {
private final CyclicBarrier barrier;
private final int totalWorkers;
private int activeWorkers;
public ResilientBarrier(int totalWorkers) {
this.totalWorkers = totalWorkers;
this.activeWorkers = totalWorkers;
this.barrier = new CyclicBarrier(totalWorkers, () -> {
System.out.println("Barrier tripped! Active workers: " + activeWorkers);
});
}
public void doWork(String workerName) {
try {
for (int i = 0; i < 3; i++) {
System.out.println(workerName + " working on iteration " + i);
Thread.sleep(1000);
// Simulate occasional worker failure
if (Math.random() < 0.1) { // 10% chance of failure
System.out.println(workerName + " failed! Removing from barrier.");
activeWorkers--;
return;
}
System.out.println(workerName + " waiting at barrier (iteration " + i + ")");
barrier.await();
}
} catch (InterruptedException | BrokenBarrierException e) {
System.out.println(workerName + " interrupted: " + e.getMessage());
}
}
public void reset() {
barrier.reset();
activeWorkers = totalWorkers;
System.out.println("Barrier reset to " + totalWorkers + " workers");
}
}
public static void main(String[] args) throws InterruptedException {
// Test 1: Matrix Processing
System.out.println("=== Test 1: Matrix Processing ===");
int[][] matrix = new int[10][10];
MatrixProcessor processor = new MatrixProcessor(matrix, 3);
for (int i = 0; i < 3; i++) {
final int workerId = i;
final int startRow = i * 3;
final int endRow = startRow + 2;
new Thread(() -> {
processor.processRow(workerId, startRow, endRow);
}).start();
}
Thread.sleep(5000);
// Test 2: Tournament
System.out.println("\n=== Test 2: Tournament ===");
TournamentOrganizer tournament = new TournamentOrganizer(4);
for (int i = 1; i <= 4; i++) {
final String playerName = "Player-" + i;
new Thread(() -> {
for (int round = 1; round <= 3; round++) {
tournament.playRound(playerName);
}
}).start();
}
Thread.sleep(8000);
// Test 3: Data Aggregation
System.out.println("\n=== Test 3: Data Aggregation ===");
DataAggregator aggregator = new DataAggregator(3);
for (int i = 0; i < 3; i++) {
final int workerId = i;
double[] dataChunk = {1.0, 2.0, 3.0, 4.0, 5.0};
new Thread(() -> {
aggregator.processData(workerId, dataChunk);
}).start();
}
Thread.sleep(3000);
// Test 4: Resilient Barrier
System.out.println("\n=== Test 4: Resilient Barrier ===");
ResilientBarrier resilient = new ResilientBarrier(3);
for (int i = 1; i <= 3; i++) {
final String workerName = "Worker-" + i;
new Thread(() -> {
resilient.doWork(workerName);
}).start();
}
Thread.sleep(5000);
resilient.reset();
System.out.println("All CyclicBarrier tests completed");
}
}

Phaser

Phaser for Flexible Synchronization

import java.util.concurrent.Phaser;
public class PhaserCommunication {
static class MultiPhaseProcessor {
private final Phaser phaser;
private final int numPhases;
public MultiPhaseProcessor(int numWorkers, int numPhases) {
this.numPhases = numPhases;
this.phaser = new Phaser(numWorkers) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("Phase " + phase + " completed. " + 
"Registered parties: " + registeredParties);
return phase >= numPhases - 1 || registeredParties == 0;
}
};
}
public void processPhase(String workerName) {
int phase;
while ((phase = phaser.getPhase()) < numPhases) {
System.out.println(workerName + " starting phase " + phase);
try {
// Simulate work
Thread.sleep(500 + (long) (Math.random() * 500));
System.out.println(workerName + " completed phase " + phase);
// Wait for all workers to complete this phase
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
System.out.println(workerName + " interrupted, deregistering");
phaser.arriveAndDeregister();
break;
}
}
System.out.println(workerName + " completed all phases");
}
}
static class DynamicWorkforce {
private final Phaser phaser;
public DynamicWorkforce() {
this.phaser = new Phaser(1); // Register self
}
public void addWorker(String workerName) {
phaser.register();
System.out.println("Added " + workerName + ". Total workers: " + phaser.getRegisteredParties());
}
public void removeWorker(String workerName) {
phaser.arriveAndDeregister();
System.out.println("Removed " + workerName + ". Total workers: " + phaser.getRegisteredParties());
}
public void coordinateWork(String coordinatorName) {
try {
for (int phase = 0; phase < 3; phase++) {
System.out.println(coordinatorName + " starting phase " + phase);
// Wait for workers to complete
int arrived = phaser.arriveAndAwaitAdvance();
System.out.println(coordinatorName + ": Phase " + phase + 
" completed. Arrived: " + arrived);
// Simulate coordination work
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void workerTask(String workerName) {
phaser.register(); // Register this worker
try {
while (!phaser.isTerminated()) {
int phase = phaser.getPhase();
System.out.println(workerName + " working on phase " + phase);
Thread.sleep(800 + (long) (Math.random() * 400));
System.out.println(workerName + " completed phase " + phase);
phaser.arriveAndAwaitAdvance();
}
} catch (InterruptedException e) {
phaser.arriveAndDeregister();
}
System.out.println(workerName + " terminated");
}
}
static class TieredProcessing {
private final Phaser tier1Phaser;
private final Phaser tier2Phaser;
public TieredProcessing(int tier1Workers, int tier2Workers) {
this.tier1Phaser = new Phaser(tier1Workers);
this.tier2Phaser = new Phaser(tier2Workers);
}
public void tier1Worker(String workerName) {
try {
for (int i = 0; i < 2; i++) {
System.out.println(workerName + " (Tier 1) processing batch " + i);
Thread.sleep(1000);
tier1Phaser.arriveAndAwaitAdvance();
System.out.println(workerName + " (Tier 1) completed batch " + i);
// Signal tier 2 to start
if (i == 0) {
System.out.println(workerName + " signaling Tier 2 to start");
}
}
} catch (InterruptedException e) {
tier1Phaser.arriveAndDeregister();
}
}
public void tier2Worker(String workerName) {
try {
// Wait for tier 1 to complete first batch
System.out.println(workerName + " (Tier 2) waiting for Tier 1...");
for (int i = 0; i < 2; i++) {
tier2Phaser.arriveAndAwaitAdvance();
System.out.println(workerName + " (Tier 2) processing batch " + i);
Thread.sleep(800);
System.out.println(workerName + " (Tier 2) completed batch " + i);
}
} catch (InterruptedException e) {
tier2Phaser.arriveAndDeregister();
}
}
public void startTier2() {
tier2Phaser.arriveAndAwaitAdvance();
}
}
static class MonitoringPhaser extends Phaser {
private final String name;
public MonitoringPhaser(int parties, String name) {
super(parties);
this.name = name;
}
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(name + " - Phase " + phase + 
" advance. Registered: " + registeredParties);
return super.onAdvance(phase, registeredParties);
}
}
public static void main(String[] args) throws InterruptedException {
// Test 1: Multi-Phase Processing
System.out.println("=== Test 1: Multi-Phase Processing ===");
MultiPhaseProcessor processor = new MultiPhaseProcessor(3, 4);
for (int i = 1; i <= 3; i++) {
final String workerName = "Worker-" + i;
new Thread(() -> {
processor.processPhase(workerName);
}).start();
}
Thread.sleep(5000);
// Test 2: Dynamic Workforce
System.out.println("\n=== Test 2: Dynamic Workforce ===");
DynamicWorkforce workforce = new DynamicWorkforce();
// Start coordinator
new Thread(() -> {
workforce.coordinateWork("Coordinator");
}).start();
Thread.sleep(500);
// Add workers dynamically
for (int i = 1; i <= 3; i++) {
final String workerName = "Dynamic-Worker-" + i;
new Thread(() -> {
workforce.workerTask(workerName);
}).start();
Thread.sleep(300);
}
Thread.sleep(2000);
// Remove a worker
workforce.removeWorker("Dynamic-Worker-2");
Thread.sleep(3000);
// Test 3: Tiered Processing
System.out.println("\n=== Test 3: Tiered Processing ===");
TieredProcessing tiered = new TieredProcessing(2, 2);
for (int i = 1; i <= 2; i++) {
final String workerName = "T1-Worker-" + i;
new Thread(() -> {
tiered.tier1Worker(workerName);
}).start();
}
for (int i = 1; i <= 2; i++) {
final String workerName = "T2-Worker-" + i;
new Thread(() -> {
tiered.tier2Worker(workerName);
}).start();
}
// Start tier 2 after a delay
Thread.sleep(1500);
tiered.startTier2();
Thread.sleep(4000);
System.out.println("All Phaser tests completed");
}
}

Exchanger

Exchanger for Data Exchange

import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ExchangerCommunication {
static class DataProducerConsumer {
private final Exchanger<String> exchanger;
private volatile boolean running = true;
public DataProducerConsumer() {
this.exchanger = new Exchanger<>();
}
public void producer(String producerName) {
try {
for (int i = 1; i <= 5; i++) {
String data = producerName + "-Data-" + i;
System.out.println(producerName + " produced: " + data);
// Exchange data with consumer
String response = exchanger.exchange(data);
System.out.println(producerName + " received response: " + response);
Thread.sleep(1000);
}
running = false;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void consumer(String consumerName) {
try {
while (running) {
// Wait for data from producer
String data = exchanger.exchange(null, 2, TimeUnit.SECONDS);
if (data != null) {
System.out.println(consumerName + " received: " + data);
// Process data and send response
String response = "Processed-" + data;
System.out.println(consumerName + " sending response: " + response);
exchanger.exchange(response);
}
}
} catch (InterruptedException | TimeoutException e) {
System.out.println(consumerName + " stopped: " + e.getClass().getSimpleName());
}
}
}
static class ResourceExchanger {
private final Exchanger<Integer> resourceExchanger;
public ResourceExchanger() {
this.resourceExchanger = new Exchanger<>();
}
public void nodeA(String nodeName) {
try {
int resource = 100; // Initial resource
for (int i = 0; i < 3; i++) {
System.out.println(nodeName + " has resource: " + resource);
// Exchange resource with node B
System.out.println(nodeName + " exchanging resource...");
resource = resourceExchanger.exchange(resource);
System.out.println(nodeName + " received resource: " + resource);
resource += 10; // Modify resource
Thread.sleep(1500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void nodeB(String nodeName) {
try {
int resource = 200; // Initial resource
for (int i = 0; i < 3; i++) {
System.out.println(nodeName + " has resource: " + resource);
// Exchange resource with node A
System.out.println(nodeName + " exchanging resource...");
resource = resourceExchanger.exchange(resource);
System.out.println(nodeName + " received resource: " + resource);
resource -= 5; // Modify resource
Thread.sleep(1200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class MultiPartyExchange {
private final Exchanger<String>[] exchangers;
@SuppressWarnings("unchecked")
public MultiPartyExchange(int numParties) {
exchangers = new Exchanger[numParties];
for (int i = 0; i < numParties; i++) {
exchangers[i] = new Exchanger<>();
}
}
public void participant(int id, String[] messages) {
try {
for (int round = 0; round < messages.length; round++) {
String messageToSend = messages[round];
System.out.println("Participant " + id + " sending: " + messageToSend);
// Exchange with next participant (circular)
int nextId = (id + 1) % exchangers.length;
String receivedMessage = exchangers[id].exchange(messageToSend);
System.out.println("Participant " + id + " received: " + receivedMessage);
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class BidirectionalCommunication {
private final Exchanger<Message> exchanger;
public BidirectionalCommunication() {
this.exchanger = new Exchanger<>();
}
static class Message {
final String content;
final int sequence;
Message(String content, int sequence) {
this.content = content;
this.sequence = sequence;
}
@Override
public String toString() {
return "Message{seq=" + sequence + ", content='" + content + "'}";
}
}
public void endpointA(String name) {
try {
for (int i = 1; i <= 4; i++) {
Message outbound = new Message("Hello from " + name, i);
System.out.println(name + " sending: " + outbound);
Message inbound = exchanger.exchange(outbound);
System.out.println(name + " received: " + inbound);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void endpointB(String name) {
try {
for (int i = 1; i <= 4; i++) {
Message outbound = new Message("Response from " + name, i);
System.out.println(name + " sending: " + outbound);
Message inbound = exchanger.exchange(outbound);
System.out.println(name + " received: " + inbound);
Thread.sleep(800);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
// Test 1: Producer-Consumer with Exchanger
System.out.println("=== Test 1: Producer-Consumer Exchange ===");
DataProducerConsumer pc = new DataProducerConsumer();
Thread producer = new Thread(() -> {
pc.producer("Producer");
});
Thread consumer = new Thread(() -> {
pc.consumer("Consumer");
});
producer.start();
consumer.start();
producer.join();
consumer.join();
// Test 2: Resource Exchange
System.out.println("\n=== Test 2: Resource Exchange ===");
ResourceExchanger resourceEx = new ResourceExchanger();
Thread nodeA = new Thread(() -> {
resourceEx.nodeA("Node-A");
});
Thread nodeB = new Thread(() -> {
resourceEx.nodeB("Node-B");
});
nodeA.start();
nodeB.start();
nodeA.join();
nodeB.join();
// Test 3: Multi-party Exchange
System.out.println("\n=== Test 3: Multi-party Exchange ===");
MultiPartyExchange multiExchange = new MultiPartyExchange(3);
String[][] messages = {
{"Hello from 0", "Second from 0", "Third from 0"},
{"Hi from 1", "Second from 1", "Third from 1"},
{"Hey from 2", "Second from 2", "Third from 2"}
};
Thread[] participants = new Thread[3];
for (int i = 0; i < 3; i++) {
final int participantId = i;
participants[i] = new Thread(() -> {
multiExchange.participant(participantId, messages[participantId]);
});
participants[i].start();
}
for (Thread t : participants) {
t.join();
}
// Test 4: Bidirectional Communication
System.out.println("\n=== Test 4: Bidirectional Communication ===");
BidirectionalCommunication bidirectional = new BidirectionalCommunication();
Thread endpoint1 = new Thread(() -> {
bidirectional.endpointA("Endpoint-A");
});
Thread endpoint2 = new Thread(() -> {
bidirectional.endpointB("Endpoint-B");
});
endpoint1.start();
endpoint2.start();
endpoint1.join();
endpoint2.join();
System.out.println("All Exchanger tests completed");
}
}

Semaphore

Semaphore for Resource Management

```java
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreCommunication {

static class ResourcePool {
private final Semaphore semaphore;
private final boolean[] resources;
private final int poolSize;
public ResourcePool(int poolSize) {
this.poolSize = poolSize;
this.semaphore = new Semaphore(poolSize, true); // Fair semaphore
this.resources = new boolean[poolSize];
// Initialize all resources as available
for (int i = 0; i < poolSize; i++) {
resources[i] = true;
}
}
public int acquireResource() throws InterruptedException {
semaphore.acquire();
return getNextAvailableResource();
}
public boolean tryAcquireResource(long timeout, TimeUnit unit) throws InterruptedException {
if (semaphore.tryAcquire(timeout, unit)) {
return getNextAvailableResource() != -1;
}
return false;
}
public void releaseResource(int resourceId) {
if (markAsAvailable(resourceId)) {
semaphore.release();
}
}
private synchronized int getNextAvailableResource() {
for (int i = 0; i < poolSize; i++) {
if (resources[i]) {
resources[i] = false;
System.out.println(Thread.currentThread().getName() + " acquired resource " + i);
return i;
}
}
return -1; // Should not happen with proper semaphore usage
}
private synchronized boolean markAsAvailable(int resourceId) {
if (!resources[resourceId]) {
resources[resourceId] = true;
System.out.println(Thread.currentThread().getName() + " released resource " + resourceId);
return true;
}
return false;
}
public int availablePermits() {
return semaphore.availablePermits();
}
}
static class RateLimiter {
private final Semaphore semaphore;
private final int maxPermits;
public RateLimiter(int maxPermits) {
this.maxPermits = maxPermits;
this.semaphore = new Semaphore(maxPermits);
// Refill permits periodically
new Thread(() -> {
while (true) {
try {
Thread.sleep(1000); // Refill every second
refillPermits();
} catch (InterruptedException e) {
break;
}
}
}).start();
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public void acquire() throws InterruptedException {
semaphore.acquire();
}
private synchronized void refillPermits() {
int currentPermits = semaphore.availablePermits();
if (currentPermits < maxPermits) {
int refillAmount = maxPermits - currentPermits;
semaphore.release(refillAmount);
System.out.println("Refilled " + refillAmount + " permits. Total: " + 
semaphore.availablePermits());
}
}
public int getAvailablePermits() {
return semaphore.availablePermits();
}
}
static class BoundedBuffer<T> {
private final Semaphore emptySlots;
private final Semaphore fullSlots;
private final Semaphore mutex;
private final T[] buffer;
private int putIndex = 0;
private int takeIndex = 0;
private int count = 0;
@SuppressWarnings("unchecked")
public BoundedBuffer(int capacity) {
this.buffer = (T[]) new Object[capacity];
this.emptySlots = new Semaphore(capacity);
this.fullSlots = new Semaphore(0);
this.mutex = new Semaphore(1); // Binary semaphore as mutex
}
public void put(T item) throws InterruptedException {
emptySlots.acquire(); // Wait for empty slot
mutex.acquire(); // Enter critical section
try {
buffer[putIndex] = item;
putIndex = (putIndex + 1) % buffer.length;
count++;
System.out.println(Thread.currentThread().getName() + " put: " + item + 
" | Count: " + count);
} finally {
mutex.release(); // Exit critical section
fullSlots.release(); //

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper