When dealing with producer-consumer scenarios in concurrent Java applications, developers often reach for buffered queues like ArrayBlockingQueue or LinkedBlockingQueue. However, there's a special-purpose queue that enables the most direct form of handoff possible between threads: the SynchronousQueue.
What is SynchronousQueue?
SynchronousQueue is a blocking queue implementation in the java.util.concurrent package where each insertion operation must wait for a corresponding removal operation by another thread, and vice versa. Unlike buffered queues, it has zero capacity - it doesn't store any elements internally.
Think of it as a synchronous rendezvous point: the producer and consumer must meet at the exact same moment for the data transfer to occur.
Key Characteristics
- Zero Capacity: No internal storage for elements
- Direct Handoff: Elements are directly transferred from producer to consumer
- Blocking Operations:
put()blocks until another thread is ready totake(), andtake()blocks until another thread is ready toput() - Dual Policy: Can operate in fair or non-fair mode
Basic Usage and Examples
Example 1: Basic Producer-Consumer Pattern
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
// Producer thread
Thread producer = new Thread(() -> {
try {
System.out.println("Producer: Attempting to put value 42...");
queue.put(42);
System.out.println("Producer: Value 42 successfully handed off!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
try {
Thread.sleep(1000); // Simulate some delay
System.out.println("Consumer: Attempting to take value...");
Integer value = queue.take();
System.out.println("Consumer: Received value: " + value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
}
}
Output:
Producer: Attempting to put value 42... Consumer: Attempting to take value... Producer: Value 42 successfully handed off! Consumer: Received value: 42
Example 2: Multiple Producers and Consumers
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
public class MultipleHandoffs {
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();
ExecutorService executor = Executors.newFixedThreadPool(4);
// Two producers
for (int i = 0; i < 2; i++) {
final int producerId = i;
executor.submit(() -> {
try {
String message = "Message from producer " + producerId;
queue.put(message);
System.out.println("Producer " + producerId + " delivered: " + message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// Two consumers
for (int i = 0; i < 2; i++) {
final int consumerId = i;
executor.submit(() -> {
try {
String message = queue.take();
System.out.println("Consumer " + consumerId + " received: " + message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}
Fair vs Non-Fair Mode
SynchronousQueue can be constructed with different transfer policies:
// Non-fair mode (default) - faster but not necessarily FIFO SynchronousQueue<Integer> nonFairQueue = new SynchronousQueue<>(); // Fair mode - guarantees FIFO ordering, but potentially slower SynchronousQueue<Integer> fairQueue = new SynchronousQueue<>(true);
Advanced Example: Work Stealing with Timeouts
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class WorkStealingWithTimeout {
public static void main(String[] args) {
SynchronousQueue<Task> queue = new SynchronousQueue<>();
// Producer with timeout
Thread producer = new Thread(() -> {
try {
Task task = new Task("Important Task");
System.out.println("Producer: Offering task with 2-second timeout...");
boolean offered = queue.offer(task, 2, TimeUnit.SECONDS);
if (offered) {
System.out.println("Producer: Task accepted within timeout!");
} else {
System.out.println("Producer: Timeout - no consumer available!");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer that starts later
Thread consumer = new Thread(() -> {
try {
Thread.sleep(3000); // Start after producer timeout
System.out.println("Consumer: Polling for task...");
Task task = queue.poll(1, TimeUnit.SECONDS);
if (task != null) {
System.out.println("Consumer: Got task: " + task);
} else {
System.out.println("Consumer: No task available");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
static class Task {
private final String name;
public Task(String name) {
this.name = name;
}
@Override
public String toString() {
return "Task{" + "name='" + name + '\'' + '}';
}
}
}
When to Use SynchronousQueue
Ideal Use Cases:
- Thread Pool Handoffs: Used internally by
Executors.newCachedThreadPool() - Task Passing Systems: When you need direct handoff between worker threads
- Backpressure Scenarios: When you want to naturally limit the rate of production
- Event Processing: For direct event passing between components
Comparison with Other Queues
| Queue Type | Capacity | Blocking Behavior | Use Case |
|---|---|---|---|
SynchronousQueue | 0 | Blocks until handoff | Direct transfer, backpressure |
ArrayBlockingQueue | Fixed | Blocks when full/empty | Bounded buffering |
LinkedBlockingQueue | Optional bound | Blocks when full/empty | Unbounded or bounded buffering |
PriorityBlockingQueue | Unbounded | Blocks when empty | Priority-based processing |
Performance Characteristics
- Low Latency: Direct handoff minimizes overhead
- Memory Efficient: No storage means minimal memory footprint
- Scalable: Performs well under high contention in fair mode
- Predictable: Provides natural backpressure without configuration
Best Practices and Pitfalls
- Always Use Timeouts: In production code, use timed versions (
offer(e, timeout, unit),poll(timeout, unit)) to avoid permanent blocking. - Monitor for Deadlocks: Ensure you have matching numbers of producers and consumers, or use timeouts to prevent system lockups.
- Consider Fairness: Use fair mode when order matters, but be aware of the performance trade-off.
- Error Handling: Always handle
InterruptedExceptionproperly to support graceful shutdown.
// Good practice example
public boolean safeTransfer(SynchronousQueue<Data> queue, Data data) {
try {
return queue.offer(data, 5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false; // Handle interruption appropriately
}
}
Conclusion
SynchronousQueue is a powerful tool in the Java concurrency toolkit that provides the most direct form of thread-to-thread communication possible. While it's not suitable for all use cases due to its zero-capacity nature, it excels in scenarios where you need:
- Direct handoff between producers and consumers
- Natural backpressure mechanism
- Minimal memory overhead
- Precise control over thread coordination
Understanding when and how to use SynchronousQueue can help you build more efficient and responsive concurrent applications in Java.