The Exchanger class in Java is a synchronization point where two threads can exchange objects. It's part of the java.util.concurrent package and is useful for pipeline-like designs where threads need to hand off data to each other.
Basic Usage
import java.util.concurrent.Exchanger;
public class BasicExchangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Thread thread1 = new Thread(new Producer(exchanger));
Thread thread2 = new Thread(new Consumer(exchanger));
thread1.start();
thread2.start();
}
}
class Producer implements Runnable {
private final Exchanger<String> exchanger;
private String data = "Hello from Producer";
public Producer(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("Producer sending: " + data);
// Exchange data with consumer and receive response
String response = exchanger.exchange(data);
System.out.println("Producer received: " + response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final Exchanger<String> exchanger;
private String data = "Hello from Consumer";
public Consumer(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("Consumer sending: " + data);
// Exchange data with producer and receive response
String response = exchanger.exchange(data);
System.out.println("Consumer received: " + response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Real-World Example: Producer-Consumer Pipeline
import java.util.concurrent.Exchanger;
import java.util.ArrayList;
import java.util.List;
class DataBuffer {
private List<Integer> data = new ArrayList<>();
public void add(int value) {
data.add(value);
}
public List<Integer> getData() {
return new ArrayList<>(data);
}
public void clear() {
data.clear();
}
public boolean isEmpty() {
return data.isEmpty();
}
@Override
public String toString() {
return "Buffer: " + data;
}
}
public class PipelineExample {
public static void main(String[] args) {
Exchanger<DataBuffer> exchanger = new Exchanger<>();
Thread producer = new Thread(new DataProducer(exchanger));
Thread consumer = new Thread(new DataConsumer(exchanger));
producer.start();
consumer.start();
}
}
class DataProducer implements Runnable {
private final Exchanger<DataBuffer> exchanger;
private DataBuffer currentBuffer = new DataBuffer();
private int counter = 0;
public DataProducer(Exchanger<DataBuffer> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
while (true) {
// Fill buffer with data
for (int i = 0; i < 3; i++) {
currentBuffer.add(counter++);
System.out.println("Produced: " + (counter - 1));
Thread.sleep(500); // Simulate work
}
System.out.println("Producer sending full buffer: " + currentBuffer);
// Exchange full buffer for empty one
currentBuffer = exchanger.exchange(currentBuffer);
System.out.println("Producer received empty buffer");
Thread.sleep(1000); // Brief pause
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class DataConsumer implements Runnable {
private final Exchanger<DataBuffer> exchanger;
private DataBuffer currentBuffer = new DataBuffer();
public DataConsumer(Exchanger<DataBuffer> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
while (true) {
// Start with empty buffer, exchange for full one
currentBuffer = exchanger.exchange(currentBuffer);
System.out.println("Consumer received full buffer: " + currentBuffer);
// Process data
for (int value : currentBuffer.getData()) {
System.out.println("Consumed: " + value);
Thread.sleep(1000); // Simulate processing time
}
// Clear buffer for next exchange
currentBuffer.clear();
System.out.println("Consumer sending back empty buffer");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Advanced Example: Multi-Stage Processing Pipeline
import java.util.concurrent.Exchanger;
class ProcessingData {
private String rawData;
private String processedData;
private boolean isProcessed;
public ProcessingData(String rawData) {
this.rawData = rawData;
this.isProcessed = false;
}
public void setProcessedData(String processedData) {
this.processedData = processedData;
this.isProcessed = true;
}
public String getRawData() { return rawData; }
public String getProcessedData() { return processedData; }
public boolean isProcessed() { return isProcessed; }
@Override
public String toString() {
return isProcessed ?
"Processed: " + processedData :
"Raw: " + rawData;
}
}
public class MultiStagePipeline {
public static void main(String[] args) {
// Create exchangers for each stage
Exchanger<ProcessingData> stage1To2 = new Exchanger<>();
Exchanger<ProcessingData> stage2To3 = new Exchanger<>();
Thread stage1 = new Thread(new DataLoader(stage1To2));
Thread stage2 = new Thread(new DataProcessor(stage1To2, stage2To3));
Thread stage3 = new Thread(new DataWriter(stage2To3));
stage1.start();
stage2.start();
stage3.start();
}
}
class DataLoader implements Runnable {
private final Exchanger<ProcessingData> exchanger;
private int dataId = 1;
public DataLoader(Exchanger<ProcessingData> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
while (true) {
// Load raw data
String rawData = "Data-" + dataId++;
ProcessingData data = new ProcessingData(rawData);
System.out.println("Stage 1 - Loaded: " + data);
// Send to processor
exchanger.exchange(data);
Thread.sleep(2000); // Simulate loading time
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class DataProcessor implements Runnable {
private final Exchanger<ProcessingData> inputExchanger;
private final Exchanger<ProcessingData> outputExchanger;
public DataProcessor(Exchanger<ProcessingData> inputExchanger,
Exchanger<ProcessingData> outputExchanger) {
this.inputExchanger = inputExchanger;
this.outputExchanger = outputExchanger;
}
@Override
public void run() {
try {
while (true) {
// Receive raw data from loader
ProcessingData data = inputExchanger.exchange(null);
// Process data
System.out.println("Stage 2 - Processing: " + data);
Thread.sleep(1500); // Simulate processing time
String processedData = data.getRawData().toUpperCase() + "-PROCESSED";
data.setProcessedData(processedData);
System.out.println("Stage 2 - Processed: " + data);
// Send to writer
outputExchanger.exchange(data);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class DataWriter implements Runnable {
private final Exchanger<ProcessingData> exchanger;
public DataWriter(Exchanger<ProcessingData> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
while (true) {
// Receive processed data
ProcessingData data = exchanger.exchange(null);
System.out.println("Stage 3 - Writing: " + data);
Thread.sleep(1000); // Simulate writing time
System.out.println("Stage 3 - Completed writing: " + data);
System.out.println("--- Pipeline Cycle Complete ---");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Timeout Support
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class TimeoutExchangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Thread fastThread = new Thread(new FastWorker(exchanger));
Thread slowThread = new Thread(new SlowWorker(exchanger));
fastThread.start();
slowThread.start();
}
}
class FastWorker implements Runnable {
private final Exchanger<String> exchanger;
public FastWorker(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("Fast worker waiting for exchange (2 seconds timeout)...");
// Wait for exchange with timeout
String received = exchanger.exchange("Fast Data", 2, TimeUnit.SECONDS);
System.out.println("Fast worker received: " + received);
} catch (TimeoutException e) {
System.out.println("Fast worker: Timeout! No partner arrived in time.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class SlowWorker implements Runnable {
private final Exchanger<String> exchanger;
public SlowWorker(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("Slow worker taking a long time...");
Thread.sleep(5000); // Sleep longer than timeout
// This will never be reached because fast worker times out
String received = exchanger.exchange("Slow Data");
System.out.println("Slow worker received: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Key Characteristics and Best Practices
1. Blocking Nature
exchange()blocks until another thread arrives- Use timeout version for better responsiveness
2. Pairing Requirement
- Exactly two threads must meet at the exchange point
- Odd numbers of threads will cause deadlocks
3. Data Ownership
- After exchange, each thread owns the received data
- The sent data should not be modified after exchange
4. Error Handling
public class SafeExchangerUsage {
private final Exchanger<Data> exchanger = new Exchanger<>();
public void safeExchange(Data data) {
try {
Data received = exchanger.exchange(data, 10, TimeUnit.SECONDS);
processReceivedData(received);
} catch (TimeoutException e) {
handleTimeout(data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
handleInterruption();
}
}
private void processReceivedData(Data data) {
// Process the exchanged data
}
private void handleTimeout(Data data) {
// Handle case when no partner arrived
System.out.println("No exchange partner arrived in time");
}
private void handleInterruption() {
// Clean up after interruption
System.out.println("Thread was interrupted during exchange");
}
}
Use Cases
- Pipeline Processing - Hand off data between processing stages
- Genetic Algorithms - Exchange chromosomes between populations
- Game Development - Swap game state between threads
- Data Processing - Alternate between filling and processing buffers
The Exchanger is a powerful tool for creating elegant, thread-safe data exchange patterns in concurrent applications.