BlockingQueue Interface Implementations in Java

The BlockingQueue interface in Java represents a thread-safe queue that supports operations that wait for the queue to become non-empty when retrieving elements, and wait for space to become available when storing elements.

Core BlockingQueue Implementations

1. ArrayBlockingQueue

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class ArrayBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
// Bounded queue with fixed capacity
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
// Producer thread
Thread producer = new Thread(() -> {
try {
queue.put("Item 1");
System.out.println("Added: Item 1");
boolean added = queue.offer("Item 2", 2, TimeUnit.SECONDS);
System.out.println("Added Item 2: " + added);
queue.put("Item 3");
System.out.println("Added: Item 3");
// This will block until space is available
queue.put("Item 4");
System.out.println("Added: Item 4");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
try {
Thread.sleep(2000); // Let producer fill the queue
while (!queue.isEmpty()) {
String item = queue.take();
System.out.println("Consumed: " + item);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
}
}

2. LinkedBlockingQueue

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class LinkedBlockingQueueExample {
public static void main(String[] args) {
// Optionally bounded queue
BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100);
BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>(); // Essentially Integer.MAX_VALUE
// Producer-Consumer example
BlockingQueue<Order> orderQueue = new LinkedBlockingQueue<>();
// Multiple producers
for (int i = 0; i < 3; i++) {
new Thread(new OrderProducer(orderQueue, "Producer-" + i)).start();
}
// Multiple consumers
for (int i = 0; i < 2; i++) {
new Thread(new OrderConsumer(orderQueue, "Consumer-" + i)).start();
}
}
}
class Order {
private final String id;
private final String product;
public Order(String id, String product) {
this.id = id;
this.product = product;
}
@Override
public String toString() {
return "Order{id='" + id + "', product='" + product + "'}";
}
}
class OrderProducer implements Runnable {
private final BlockingQueue<Order> queue;
private final String name;
private int orderId = 1;
public OrderProducer(BlockingQueue<Order> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (true) {
Order order = new Order(name + "-Order-" + orderId++, "Product-" + (orderId % 5));
queue.put(order);
System.out.println(name + " produced: " + order);
Thread.sleep(500 + (int)(Math.random() * 1000));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class OrderConsumer implements Runnable {
private final BlockingQueue<Order> queue;
private final String name;
public OrderConsumer(BlockingQueue<Order> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (true) {
Order order = queue.take();
System.out.println(name + " consumed: " + order);
// Process order
Thread.sleep(1000 + (int)(Math.random() * 1500));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

3. PriorityBlockingQueue

import java.util.concurrent.PriorityBlockingQueue;
import java.util.Comparator;
public class PriorityBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
// Natural ordering
BlockingQueue<Integer> priorityQueue = new PriorityBlockingQueue<>();
// Custom ordering
BlockingQueue<Task> taskQueue = new PriorityBlockingQueue<>(
11, // initial capacity
Comparator.comparing(Task::getPriority).reversed() // Higher priority first
);
// Add tasks with different priorities
taskQueue.put(new Task("Low Priority Task", Task.Priority.LOW));
taskQueue.put(new Task("High Priority Task", Task.Priority.HIGH));
taskQueue.put(new Task("Medium Priority Task", Task.Priority.MEDIUM));
taskQueue.put(new Task("Urgent Task", Task.Priority.URGENT));
// Consumers will process tasks in priority order
for (int i = 0; i < 2; i++) {
new Thread(new TaskProcessor(taskQueue, "Processor-" + i)).start();
}
}
}
class Task {
enum Priority { LOW, MEDIUM, HIGH, URGENT }
private final String name;
private final Priority priority;
public Task(String name, Priority priority) {
this.name = name;
this.priority = priority;
}
public Priority getPriority() {
return priority;
}
@Override
public String toString() {
return String.format("Task{name='%s', priority=%s}", name, priority);
}
}
class TaskProcessor implements Runnable {
private final BlockingQueue<Task> queue;
private final String name;
public TaskProcessor(BlockingQueue<Task> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (true) {
Task task = queue.take();
System.out.println(name + " processing: " + task);
Thread.sleep(1000); // Simulate processing
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

4. SynchronousQueue

import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueExample {
public static void main(String[] args) {
// Direct handoff between producers and consumers
SynchronousQueue<String> queue = new SynchronousQueue<>(true); // fair ordering
// Producer
new Thread(() -> {
try {
String[] items = {"Item1", "Item2", "Item3", "Item4"};
for (String item : items) {
System.out.println("Producer trying to put: " + item);
queue.put(item); // Blocks until consumer takes
System.out.println("Producer successfully put: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Consumer (slower)
new Thread(() -> {
try {
Thread.sleep(2000); // Start consuming after delay
while (true) {
String item = queue.take(); // Blocks until producer puts
System.out.println("Consumer took: " + item);
Thread.sleep(2000); // Simulate slow processing
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}

5. DelayQueue

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueExample {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
// Add tasks with different delays
long now = System.currentTimeMillis();
delayQueue.put(new DelayedTask("Task 1", now + 3000)); // 3 seconds
delayQueue.put(new DelayedTask("Task 2", now + 1000)); // 1 second
delayQueue.put(new DelayedTask("Task 3", now + 5000)); // 5 seconds
System.out.println("Tasks added to delay queue");
// Consumer that processes tasks when they become available
new Thread(() -> {
try {
while (!delayQueue.isEmpty()) {
DelayedTask task = delayQueue.take(); // Blocks until delay expires
System.out.println("Processed: " + task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
class DelayedTask implements Delayed {
private final String name;
private final long startTime;
public DelayedTask(String name, long startTime) {
this.name = name;
this.startTime = startTime;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
return Long.compare(this.startTime, ((DelayedTask) other).startTime);
}
@Override
public String toString() {
return "DelayedTask{name='" + name + "', startTime=" + startTime + "}";
}
}

Comparison and Usage Patterns

Performance Characteristics Comparison

import java.util.concurrent.*;
public class BlockingQueueComparison {
public static void main(String[] args) throws InterruptedException {
int capacity = 1000;
int elements = 10000;
// Test different implementations
testQueue(new ArrayBlockingQueue<>(capacity), "ArrayBlockingQueue", elements);
testQueue(new LinkedBlockingQueue<>(capacity), "LinkedBlockingQueue", elements);
testQueue(new PriorityBlockingQueue<>(capacity), "PriorityBlockingQueue", elements);
testQueue(new SynchronousQueue<>(), "SynchronousQueue", elements);
}
private static void testQueue(BlockingQueue<Integer> queue, String name, int elements) 
throws InterruptedException {
System.out.println("\nTesting " + name);
long startTime = System.currentTimeMillis();
// Producer thread
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < elements; i++) {
queue.put(i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < elements; i++) {
queue.take();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
long endTime = System.currentTimeMillis();
System.out.println(name + " completed in " + (endTime - startTime) + "ms");
}
}

Producer-Consumer with Multiple Implementations

import java.util.concurrent.*;
import java.util.*;
public class MultiQueueProducerConsumer {
public static void main(String[] args) {
// Create different queue implementations
List<BlockingQueue<String>> queues = Arrays.asList(
new ArrayBlockingQueue<>(10),
new LinkedBlockingQueue<>(10),
new PriorityBlockingQueue<>(10)
);
// Create producers for each queue
List<Thread> producers = new ArrayList<>();
for (int i = 0; i < queues.size(); i++) {
BlockingQueue<String> queue = queues.get(i);
Thread producer = new Thread(new StringProducer(queue, "Producer-" + i));
producers.add(producer);
producer.start();
}
// Create consumers for each queue
List<Thread> consumers = new ArrayList<>();
for (int i = 0; i < queues.size(); i++) {
BlockingQueue<String> queue = queues.get(i);
Thread consumer = new Thread(new StringConsumer(queue, "Consumer-" + i));
consumers.add(consumer);
consumer.start();
}
// Let it run for a while
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Interrupt all threads
producers.forEach(Thread::interrupt);
consumers.forEach(Thread::interrupt);
}
}
class StringProducer implements Runnable {
private final BlockingQueue<String> queue;
private final String name;
private int counter = 0;
public StringProducer(BlockingQueue<String> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
String item = name + "-Item-" + (counter++);
if (queue.offer(item, 100, TimeUnit.MILLISECONDS)) {
System.out.println(name + " produced: " + item);
} else {
System.out.println(name + " failed to produce: " + item + " (queue full)");
}
Thread.sleep(200 + (int)(Math.random() * 300));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class StringConsumer implements Runnable {
private final BlockingQueue<String> queue;
private final String name;
public StringConsumer(BlockingQueue<String> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
String item = queue.poll(500, TimeUnit.MILLISECONDS);
if (item != null) {
System.out.println(name + " consumed: " + item);
} else {
System.out.println(name + " found queue empty");
}
Thread.sleep(300 + (int)(Math.random() * 400));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

Advanced Usage: Batching with BlockingQueue

import java.util.concurrent.*;
import java.util.*;
public class BatchProcessor {
private final BlockingQueue<DataItem> queue;
private final int batchSize;
private final long timeoutMs;
private volatile boolean running = true;
public BatchProcessor(BlockingQueue<DataItem> queue, int batchSize, long timeoutMs) {
this.queue = queue;
this.batchSize = batchSize;
this.timeoutMs = timeoutMs;
}
public void startProcessing() {
Thread processor = new Thread(this::processBatches);
processor.start();
}
public void stopProcessing() {
running = false;
}
private void processBatches() {
List<DataItem> batch = new ArrayList<>(batchSize);
while (running || !queue.isEmpty()) {
try {
batch.clear();
// Wait for first element
DataItem firstItem = queue.poll(timeoutMs, TimeUnit.MILLISECONDS);
if (firstItem != null) {
batch.add(firstItem);
// Try to gather more items up to batchSize
while (batch.size() < batchSize && running) {
DataItem item = queue.poll(100, TimeUnit.MILLISECONDS);
if (item != null) {
batch.add(item);
} else {
break; // No more items immediately available
}
}
// Process the batch
processBatch(batch);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private void processBatch(List<DataItem> batch) {
System.out.println("Processing batch of " + batch.size() + " items: " + batch);
// Simulate processing time
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void addItem(DataItem item) throws InterruptedException {
queue.put(item);
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<DataItem> queue = new LinkedBlockingQueue<>();
BatchProcessor processor = new BatchProcessor(queue, 5, 1000);
processor.startProcessing();
// Add items
for (int i = 0; i < 23; i++) {
processor.addItem(new DataItem("Item-" + i));
Thread.sleep(150); // Variable rate
}
Thread.sleep(2000); // Let processor finish
processor.stopProcessing();
}
}
class DataItem {
private final String data;
public DataItem(String data) {
this.data = data;
}
@Override
public String toString() {
return data;
}
}

Key Differences and When to Use Each

ImplementationBoundedOrderingPerformanceUse Cases
ArrayBlockingQueueYesFIFOGood for fixed sizeFixed-size buffers, resource pools
LinkedBlockingQueueOptionalFIFOGood for variable sizeGeneral purpose, task queues
PriorityBlockingQueueNoPriorityLog(n) for insertTask scheduling, priority processing
SynchronousQueueYes (0 capacity)FIFO/LIFODirect handoffDirect thread communication
DelayQueueNoDelay-basedLog(n) for insertScheduling, timeout handling

Best Practices

  1. Choose the right implementation based on your ordering and bounding needs
  2. Always handle InterruptedException properly
  3. Use timeouts to avoid indefinite blocking
  4. Consider capacity carefully for bounded queues
  5. Monitor queue sizes in production applications
  6. Use appropriate queue for the workload characteristics

BlockingQueue implementations are essential building blocks for concurrent applications in Java, providing robust thread-safe communication between producers and consumers.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper