The BlockingQueue interface in Java represents a thread-safe queue that supports operations that wait for the queue to become non-empty when retrieving elements, and wait for space to become available when storing elements.
Core BlockingQueue Implementations
1. ArrayBlockingQueue
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class ArrayBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
// Bounded queue with fixed capacity
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
// Producer thread
Thread producer = new Thread(() -> {
try {
queue.put("Item 1");
System.out.println("Added: Item 1");
boolean added = queue.offer("Item 2", 2, TimeUnit.SECONDS);
System.out.println("Added Item 2: " + added);
queue.put("Item 3");
System.out.println("Added: Item 3");
// This will block until space is available
queue.put("Item 4");
System.out.println("Added: Item 4");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
try {
Thread.sleep(2000); // Let producer fill the queue
while (!queue.isEmpty()) {
String item = queue.take();
System.out.println("Consumed: " + item);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
}
}
2. LinkedBlockingQueue
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class LinkedBlockingQueueExample {
public static void main(String[] args) {
// Optionally bounded queue
BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100);
BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>(); // Essentially Integer.MAX_VALUE
// Producer-Consumer example
BlockingQueue<Order> orderQueue = new LinkedBlockingQueue<>();
// Multiple producers
for (int i = 0; i < 3; i++) {
new Thread(new OrderProducer(orderQueue, "Producer-" + i)).start();
}
// Multiple consumers
for (int i = 0; i < 2; i++) {
new Thread(new OrderConsumer(orderQueue, "Consumer-" + i)).start();
}
}
}
class Order {
private final String id;
private final String product;
public Order(String id, String product) {
this.id = id;
this.product = product;
}
@Override
public String toString() {
return "Order{id='" + id + "', product='" + product + "'}";
}
}
class OrderProducer implements Runnable {
private final BlockingQueue<Order> queue;
private final String name;
private int orderId = 1;
public OrderProducer(BlockingQueue<Order> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (true) {
Order order = new Order(name + "-Order-" + orderId++, "Product-" + (orderId % 5));
queue.put(order);
System.out.println(name + " produced: " + order);
Thread.sleep(500 + (int)(Math.random() * 1000));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class OrderConsumer implements Runnable {
private final BlockingQueue<Order> queue;
private final String name;
public OrderConsumer(BlockingQueue<Order> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (true) {
Order order = queue.take();
System.out.println(name + " consumed: " + order);
// Process order
Thread.sleep(1000 + (int)(Math.random() * 1500));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
3. PriorityBlockingQueue
import java.util.concurrent.PriorityBlockingQueue;
import java.util.Comparator;
public class PriorityBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
// Natural ordering
BlockingQueue<Integer> priorityQueue = new PriorityBlockingQueue<>();
// Custom ordering
BlockingQueue<Task> taskQueue = new PriorityBlockingQueue<>(
11, // initial capacity
Comparator.comparing(Task::getPriority).reversed() // Higher priority first
);
// Add tasks with different priorities
taskQueue.put(new Task("Low Priority Task", Task.Priority.LOW));
taskQueue.put(new Task("High Priority Task", Task.Priority.HIGH));
taskQueue.put(new Task("Medium Priority Task", Task.Priority.MEDIUM));
taskQueue.put(new Task("Urgent Task", Task.Priority.URGENT));
// Consumers will process tasks in priority order
for (int i = 0; i < 2; i++) {
new Thread(new TaskProcessor(taskQueue, "Processor-" + i)).start();
}
}
}
class Task {
enum Priority { LOW, MEDIUM, HIGH, URGENT }
private final String name;
private final Priority priority;
public Task(String name, Priority priority) {
this.name = name;
this.priority = priority;
}
public Priority getPriority() {
return priority;
}
@Override
public String toString() {
return String.format("Task{name='%s', priority=%s}", name, priority);
}
}
class TaskProcessor implements Runnable {
private final BlockingQueue<Task> queue;
private final String name;
public TaskProcessor(BlockingQueue<Task> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (true) {
Task task = queue.take();
System.out.println(name + " processing: " + task);
Thread.sleep(1000); // Simulate processing
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
4. SynchronousQueue
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueExample {
public static void main(String[] args) {
// Direct handoff between producers and consumers
SynchronousQueue<String> queue = new SynchronousQueue<>(true); // fair ordering
// Producer
new Thread(() -> {
try {
String[] items = {"Item1", "Item2", "Item3", "Item4"};
for (String item : items) {
System.out.println("Producer trying to put: " + item);
queue.put(item); // Blocks until consumer takes
System.out.println("Producer successfully put: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Consumer (slower)
new Thread(() -> {
try {
Thread.sleep(2000); // Start consuming after delay
while (true) {
String item = queue.take(); // Blocks until producer puts
System.out.println("Consumer took: " + item);
Thread.sleep(2000); // Simulate slow processing
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
5. DelayQueue
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueExample {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
// Add tasks with different delays
long now = System.currentTimeMillis();
delayQueue.put(new DelayedTask("Task 1", now + 3000)); // 3 seconds
delayQueue.put(new DelayedTask("Task 2", now + 1000)); // 1 second
delayQueue.put(new DelayedTask("Task 3", now + 5000)); // 5 seconds
System.out.println("Tasks added to delay queue");
// Consumer that processes tasks when they become available
new Thread(() -> {
try {
while (!delayQueue.isEmpty()) {
DelayedTask task = delayQueue.take(); // Blocks until delay expires
System.out.println("Processed: " + task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
class DelayedTask implements Delayed {
private final String name;
private final long startTime;
public DelayedTask(String name, long startTime) {
this.name = name;
this.startTime = startTime;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
return Long.compare(this.startTime, ((DelayedTask) other).startTime);
}
@Override
public String toString() {
return "DelayedTask{name='" + name + "', startTime=" + startTime + "}";
}
}
Comparison and Usage Patterns
Performance Characteristics Comparison
import java.util.concurrent.*;
public class BlockingQueueComparison {
public static void main(String[] args) throws InterruptedException {
int capacity = 1000;
int elements = 10000;
// Test different implementations
testQueue(new ArrayBlockingQueue<>(capacity), "ArrayBlockingQueue", elements);
testQueue(new LinkedBlockingQueue<>(capacity), "LinkedBlockingQueue", elements);
testQueue(new PriorityBlockingQueue<>(capacity), "PriorityBlockingQueue", elements);
testQueue(new SynchronousQueue<>(), "SynchronousQueue", elements);
}
private static void testQueue(BlockingQueue<Integer> queue, String name, int elements)
throws InterruptedException {
System.out.println("\nTesting " + name);
long startTime = System.currentTimeMillis();
// Producer thread
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < elements; i++) {
queue.put(i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < elements; i++) {
queue.take();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
long endTime = System.currentTimeMillis();
System.out.println(name + " completed in " + (endTime - startTime) + "ms");
}
}
Producer-Consumer with Multiple Implementations
import java.util.concurrent.*;
import java.util.*;
public class MultiQueueProducerConsumer {
public static void main(String[] args) {
// Create different queue implementations
List<BlockingQueue<String>> queues = Arrays.asList(
new ArrayBlockingQueue<>(10),
new LinkedBlockingQueue<>(10),
new PriorityBlockingQueue<>(10)
);
// Create producers for each queue
List<Thread> producers = new ArrayList<>();
for (int i = 0; i < queues.size(); i++) {
BlockingQueue<String> queue = queues.get(i);
Thread producer = new Thread(new StringProducer(queue, "Producer-" + i));
producers.add(producer);
producer.start();
}
// Create consumers for each queue
List<Thread> consumers = new ArrayList<>();
for (int i = 0; i < queues.size(); i++) {
BlockingQueue<String> queue = queues.get(i);
Thread consumer = new Thread(new StringConsumer(queue, "Consumer-" + i));
consumers.add(consumer);
consumer.start();
}
// Let it run for a while
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Interrupt all threads
producers.forEach(Thread::interrupt);
consumers.forEach(Thread::interrupt);
}
}
class StringProducer implements Runnable {
private final BlockingQueue<String> queue;
private final String name;
private int counter = 0;
public StringProducer(BlockingQueue<String> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
String item = name + "-Item-" + (counter++);
if (queue.offer(item, 100, TimeUnit.MILLISECONDS)) {
System.out.println(name + " produced: " + item);
} else {
System.out.println(name + " failed to produce: " + item + " (queue full)");
}
Thread.sleep(200 + (int)(Math.random() * 300));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class StringConsumer implements Runnable {
private final BlockingQueue<String> queue;
private final String name;
public StringConsumer(BlockingQueue<String> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
String item = queue.poll(500, TimeUnit.MILLISECONDS);
if (item != null) {
System.out.println(name + " consumed: " + item);
} else {
System.out.println(name + " found queue empty");
}
Thread.sleep(300 + (int)(Math.random() * 400));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Advanced Usage: Batching with BlockingQueue
import java.util.concurrent.*;
import java.util.*;
public class BatchProcessor {
private final BlockingQueue<DataItem> queue;
private final int batchSize;
private final long timeoutMs;
private volatile boolean running = true;
public BatchProcessor(BlockingQueue<DataItem> queue, int batchSize, long timeoutMs) {
this.queue = queue;
this.batchSize = batchSize;
this.timeoutMs = timeoutMs;
}
public void startProcessing() {
Thread processor = new Thread(this::processBatches);
processor.start();
}
public void stopProcessing() {
running = false;
}
private void processBatches() {
List<DataItem> batch = new ArrayList<>(batchSize);
while (running || !queue.isEmpty()) {
try {
batch.clear();
// Wait for first element
DataItem firstItem = queue.poll(timeoutMs, TimeUnit.MILLISECONDS);
if (firstItem != null) {
batch.add(firstItem);
// Try to gather more items up to batchSize
while (batch.size() < batchSize && running) {
DataItem item = queue.poll(100, TimeUnit.MILLISECONDS);
if (item != null) {
batch.add(item);
} else {
break; // No more items immediately available
}
}
// Process the batch
processBatch(batch);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private void processBatch(List<DataItem> batch) {
System.out.println("Processing batch of " + batch.size() + " items: " + batch);
// Simulate processing time
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void addItem(DataItem item) throws InterruptedException {
queue.put(item);
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<DataItem> queue = new LinkedBlockingQueue<>();
BatchProcessor processor = new BatchProcessor(queue, 5, 1000);
processor.startProcessing();
// Add items
for (int i = 0; i < 23; i++) {
processor.addItem(new DataItem("Item-" + i));
Thread.sleep(150); // Variable rate
}
Thread.sleep(2000); // Let processor finish
processor.stopProcessing();
}
}
class DataItem {
private final String data;
public DataItem(String data) {
this.data = data;
}
@Override
public String toString() {
return data;
}
}
Key Differences and When to Use Each
| Implementation | Bounded | Ordering | Performance | Use Cases |
|---|---|---|---|---|
| ArrayBlockingQueue | Yes | FIFO | Good for fixed size | Fixed-size buffers, resource pools |
| LinkedBlockingQueue | Optional | FIFO | Good for variable size | General purpose, task queues |
| PriorityBlockingQueue | No | Priority | Log(n) for insert | Task scheduling, priority processing |
| SynchronousQueue | Yes (0 capacity) | FIFO/LIFO | Direct handoff | Direct thread communication |
| DelayQueue | No | Delay-based | Log(n) for insert | Scheduling, timeout handling |
Best Practices
- Choose the right implementation based on your ordering and bounding needs
- Always handle InterruptedException properly
- Use timeouts to avoid indefinite blocking
- Consider capacity carefully for bounded queues
- Monitor queue sizes in production applications
- Use appropriate queue for the workload characteristics
BlockingQueue implementations are essential building blocks for concurrent applications in Java, providing robust thread-safe communication between producers and consumers.