RabbitMQ Messaging in Java

Introduction to RabbitMQ

RabbitMQ is a robust, open-source message broker that implements the Advanced Message Queuing Protocol (AMQP). It enables asynchronous communication between distributed systems, providing reliable message delivery, routing, and queuing capabilities.

Core Concepts

Key RabbitMQ Components

  • Producer: Application that sends messages
  • Consumer: Application that receives messages
  • Queue: Buffer that stores messages
  • Exchange: Receives messages from producers and routes them to queues
  • Binding: Relationship between exchange and queue
  • Message: Data sent between applications

Exchange Types

  1. Direct: Routes messages with matching routing key
  2. Fanout: Broadcasts messages to all bound queues
  3. Topic: Routes messages based on pattern matching
  4. Headers: Routes based on message headers

Setup and Dependencies

Maven Dependencies

<dependencies>
<!-- RabbitMQ Java Client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
<!-- SLF4J API for logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.6</version>
</dependency>
<!-- SLF4J Simple Implementation -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.6</version>
</dependency>
</dependencies>

Connection Factory Utility

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQConnection {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String VIRTUAL_HOST = "/";
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VIRTUAL_HOST);
// Connection settings
factory.setConnectionTimeout(30000);
factory.setRequestedHeartbeat(60);
factory.setNetworkRecoveryInterval(5000);
return factory.newConnection();
}
public static Connection getConnection(String host, int port, 
String username, String password) 
throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
return factory.newConnection();
}
}

Basic Messaging Patterns

1. Simple Producer-Consumer

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class SimpleProducerConsumer {
private static final String QUEUE_NAME = "hello_queue";
// Simple Producer
public static class HelloWorldProducer {
public void sendMessage(String message) throws IOException, TimeoutException {
try (Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel()) {
// Declare a queue (creates if doesn't exist)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// Publish message
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
// Simple Consumer
public static class HelloWorldConsumer {
public void startConsuming() throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// Declare queue (same as producer)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// Callback for message delivery
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
// Start consuming
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// Start consumer in separate thread
Thread consumerThread = new Thread(() -> {
try {
new HelloWorldConsumer().startConsuming();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
consumerThread.start();
// Give consumer time to start
Thread.sleep(2000);
// Send messages
HelloWorldProducer producer = new HelloWorldProducer();
for (int i = 1; i <= 5; i++) {
producer.sendMessage("Hello World " + i);
Thread.sleep(1000);
}
// Keep program running
Thread.sleep(5000);
}
}

2. Work Queue (Task Distribution)

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class WorkQueueExample {
private static final String TASK_QUEUE_NAME = "task_queue";
public static class TaskProducer {
public void sendTask(String task) throws IOException, TimeoutException {
try (Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel()) {
// Make queue durable
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// Make message persistent
AMQP.BasicProperties properties = MessageProperties.PERSISTENT_TEXT_PLAIN;
channel.basicPublish("", TASK_QUEUE_NAME, properties, task.getBytes());
System.out.println(" [x] Sent task: '" + task + "'");
}
}
}
public static class Worker {
private final String workerName;
public Worker(String workerName) {
this.workerName = workerName;
}
public void startWorking() throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// Make queue durable
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(workerName + " waiting for tasks...");
// Process one message at a time
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String task = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(workerName + " started task: '" + task + "'");
try {
doWork(task);
} finally {
System.out.println(workerName + " completed task: '" + task + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// Auto-ack set to false for manual acknowledgment
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
private void doWork(String task) {
// Simulate task processing time based on dots in message
int processingTime = task.chars().map(ch -> ch == '.' ? 1 : 0).sum();
try {
Thread.sleep(processingTime * 1000L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// Start multiple workers
Thread worker1 = new Thread(() -> {
try {
new Worker("Worker-1").startWorking();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
Thread worker2 = new Thread(() -> {
try {
new Worker("Worker-2").startWorking();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
worker1.start();
worker2.start();
// Give workers time to start
Thread.sleep(2000);
// Send tasks
TaskProducer producer = new TaskProducer();
String[] tasks = {
"First task...",
"Second task..",
"Third task...",
"Fourth task.",
"Fifth task...."
};
for (String task : tasks) {
producer.sendTask(task);
}
// Keep program running
Thread.sleep(10000);
}
}

Advanced Exchange Patterns

3. Publish/Subscribe (Fanout Exchange)

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class PubSubExample {
private static final String EXCHANGE_NAME = "logs";
public static class LogsProducer {
public void broadcastMessage(String message) throws IOException, TimeoutException {
try (Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel()) {
// Declare fanout exchange
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// Publish to exchange (no routing key needed for fanout)
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent broadcast: '" + message + "'");
}
}
}
public static class LogsConsumer {
private final String consumerName;
public LogsConsumer(String consumerName) {
this.consumerName = consumerName;
}
public void startListening() throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// Declare fanout exchange
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// Create temporary queue (auto-delete, exclusive)
String queueName = channel.queueDeclare().getQueue();
// Bind queue to exchange
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(consumerName + " waiting for messages...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(consumerName + " received: '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// Start multiple consumers
Thread consumer1 = new Thread(() -> {
try {
new LogsConsumer("Consumer-1").startListening();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
Thread consumer2 = new Thread(() -> {
try {
new LogsConsumer("Consumer-2").startListening();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
Thread consumer3 = new Thread(() -> {
try {
new LogsConsumer("Consumer-3").startListening();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
consumer1.start();
consumer2.start();
consumer3.start();
// Give consumers time to start
Thread.sleep(2000);
// Broadcast messages
LogsProducer producer = new LogsProducer();
for (int i = 1; i <= 3; i++) {
producer.broadcastMessage("Log message #" + i);
Thread.sleep(1000);
}
Thread.sleep(5000);
}
}

4. Routing (Direct Exchange)

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class RoutingExample {
private static final String EXCHANGE_NAME = "direct_logs";
public static class RoutingProducer {
public void sendMessage(String severity, String message) 
throws IOException, TimeoutException {
try (Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel()) {
// Declare direct exchange
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// Publish with routing key
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
}
}
public static class RoutingConsumer {
private final String consumerName;
private final String[] severities;
public RoutingConsumer(String consumerName, String... severities) {
this.consumerName = consumerName;
this.severities = severities;
}
public void startListening() throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// Declare direct exchange
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// Create temporary queue
String queueName = channel.queueDeclare().getQueue();
// Bind for each severity
for (String severity : severities) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
System.out.println(consumerName + " bound to severity: " + severity);
}
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(consumerName + " received '" + routingKey + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// Start consumers with different routing interests
Thread errorConsumer = new Thread(() -> {
try {
new RoutingConsumer("Error-Logger", "error").startListening();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
Thread warningConsumer = new Thread(() -> {
try {
new RoutingConsumer("Warning-Logger", "warning", "error").startListening();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
Thread infoConsumer = new Thread(() -> {
try {
new RoutingConsumer("Info-Logger", "info", "warning", "error").startListening();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
errorConsumer.start();
warningConsumer.start();
infoConsumer.start();
Thread.sleep(2000);
// Send messages with different severities
RoutingProducer producer = new RoutingProducer();
producer.sendMessage("info", "This is an info message");
producer.sendMessage("warning", "This is a warning message");
producer.sendMessage("error", "This is an error message");
producer.sendMessage("debug", "This is a debug message"); // No consumer for debug
Thread.sleep(5000);
}
}

5. Topic Exchange

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class TopicExchangeExample {
private static final String EXCHANGE_NAME = "topic_logs";
public static class TopicProducer {
public void sendMessage(String routingKey, String message) 
throws IOException, TimeoutException {
try (Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
}
public static class TopicConsumer {
private final String consumerName;
private final String[] bindingKeys;
public TopicConsumer(String consumerName, String... bindingKeys) {
this.consumerName = consumerName;
this.bindingKeys = bindingKeys;
}
public void startListening() throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
for (String bindingKey : bindingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
System.out.println(consumerName + " bound with key: " + bindingKey);
}
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(consumerName + " received '" + routingKey + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// Start consumers with different topic patterns
Thread ordersConsumer = new Thread(() -> {
try {
new TopicConsumer("Orders-Processor", "order.*").startListening();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
Thread paymentsConsumer = new Thread(() -> {
try {
new TopicConsumer("Payments-Processor", "payment.*").startListening();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
Thread notificationsConsumer = new Thread(() -> {
try {
new TopicConsumer("Notifications-Processor", "*.notification").startListening();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
Thread allConsumer = new Thread(() -> {
try {
new TopicConsumer("All-Messages", "#").startListening();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
ordersConsumer.start();
paymentsConsumer.start();
notificationsConsumer.start();
allConsumer.start();
Thread.sleep(2000);
// Send messages with different routing keys
TopicProducer producer = new TopicProducer();
producer.sendMessage("order.created", "New order #123 created");
producer.sendMessage("order.shipped", "Order #123 shipped");
producer.sendMessage("payment.received", "Payment received for order #123");
producer.sendMessage("payment.failed", "Payment failed for order #124");
producer.sendMessage("system.notification", "System maintenance scheduled");
producer.sendMessage("user.notification", "Welcome email sent");
Thread.sleep(5000);
}
}

RPC (Request-Reply) Pattern

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCExample {
private static final String RPC_QUEUE_NAME = "rpc_queue";
// RPC Client
public static class RPCClient {
private Connection connection;
private Channel channel;
private String replyQueueName;
public RPCClient() throws IOException, TimeoutException {
connection = RabbitMQConnection.getConnection();
channel = connection.createChannel();
// Create temporary callback queue
replyQueueName = channel.queueDeclare().getQueue();
}
public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();
// Setup reply properties
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
// Publish request
channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes());
// Wait for response
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, 
(consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), StandardCharsets.UTF_8));
}
}, consumerTag -> {});
String result = response.take();
channel.basicCancel(ctag);
return result;
}
public void close() throws IOException {
connection.close();
}
}
// RPC Server
public static class RPCServer {
public void start() throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [.] Processing: " + message);
response = processRequest(message);
} catch (RuntimeException e) {
System.out.println(" [.] Error: " + e.toString());
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), 
replyProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
private String processRequest(String request) {
// Simulate processing
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Processed: " + request.toUpperCase();
}
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// Start RPC server
Thread serverThread = new Thread(() -> {
try {
new RPCServer().start();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
serverThread.start();
Thread.sleep(2000);
// Make RPC calls
RPCClient client = new RPCClient();
System.out.println(" [x] Requesting processing for 'hello'");
String response = client.call("hello");
System.out.println(" [.] Got '" + response + "'");
System.out.println(" [x] Requesting processing for 'world'");
response = client.call("world");
System.out.println(" [.] Got '" + response + "'");
System.out.println(" [x] Requesting processing for 'rabbitmq'");
response = client.call("rabbitmq");
System.out.println(" [.] Got '" + response + "'");
client.close();
}
}

Message Reliability and Error Handling

Reliable Messaging with Confirmations

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
public class ReliableMessagingExample {
private static final String RELIABLE_QUEUE = "reliable_queue";
public static class ReliableProducer {
private final Channel channel;
private final ConcurrentNavigableMap<Long, String> outstandingConfirms;
public ReliableProducer() throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
this.channel = connection.createChannel();
this.outstandingConfirms = new ConcurrentSkipListMap<>();
// Enable publisher confirmations
channel.confirmSelect();
// Setup confirmation listeners
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
handleConfirmation(deliveryTag, multiple, true);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
handleConfirmation(deliveryTag, multiple, false);
}
private void handleConfirmation(long deliveryTag, boolean multiple, boolean ack) {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = 
outstandingConfirms.headMap(deliveryTag, true);
confirmed.clear();
} else {
outstandingConfirms.remove(deliveryTag);
}
String status = ack ? "ACK" : "NACK";
System.out.println("Message " + deliveryTag + " " + status);
}
});
// Declare durable queue
channel.queueDeclare(RELIABLE_QUEUE, true, false, false, null);
}
public void sendReliableMessage(String message) throws IOException, InterruptedException {
long sequenceNumber = channel.getNextPublishSeqNo();
outstandingConfirms.put(sequenceNumber, message);
// Send persistent message
AMQP.BasicProperties properties = MessageProperties.PERSISTENT_TEXT_PLAIN;
channel.basicPublish("", RELIABLE_QUEUE, properties, message.getBytes());
System.out.println("Sent message #" + sequenceNumber + ": " + message);
// Wait for confirmation with timeout
if (!channel.waitForConfirms(5000)) {
System.out.println("Message #" + sequenceNumber + " not confirmed!");
}
}
public void close() throws IOException, TimeoutException {
channel.close();
channel.getConnection().close();
}
}
public static class ReliableConsumer {
public void startConsuming() throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// Declare durable queue
channel.queueDeclare(RELIABLE_QUEUE, true, false, false, null);
// Process one message at a time
channel.basicQos(1);
System.out.println(" [*] Waiting for reliable messages...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received: '" + message + "'");
try {
processMessage(message);
// Acknowledge message
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(" [x] Message processed and acknowledged");
} catch (Exception e) {
System.out.println(" [x] Message processing failed: " + e.getMessage());
// Reject and requeue
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
channel.basicConsume(RELIABLE_QUEUE, false, deliverCallback, consumerTag -> {});
}
private void processMessage(String message) throws Exception {
// Simulate processing
Thread.sleep(1000);
// Simulate occasional failure
if (message.contains("fail")) {
throw new Exception("Simulated processing failure");
}
}
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// Start consumer
Thread consumerThread = new Thread(() -> {
try {
new ReliableConsumer().startConsuming();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
consumerThread.start();
Thread.sleep(2000);
// Send reliable messages
ReliableProducer producer = new ReliableProducer();
producer.sendReliableMessage("Reliable message 1");
producer.sendReliableMessage("This message will fail processing");
producer.sendReliableMessage("Reliable message 3");
Thread.sleep(5000);
producer.close();
}
}

Spring AMQP Integration

Spring Boot Configuration

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@SpringBootApplication
@EnableScheduling
public class SpringRabbitMQApplication {
// Queue Definitions
@Bean
public Queue userQueue() {
return new Queue("user.queue", true);
}
@Bean
public Queue emailQueue() {
return new Queue("email.queue", true);
}
// Exchange Definitions
@Bean
public TopicExchange userExchange() {
return new TopicExchange("user.exchange");
}
// Binding Definitions
@Bean
public Binding userBinding(Queue userQueue, TopicExchange userExchange) {
return BindingBuilder.bind(userQueue)
.to(userExchange)
.with("user.*");
}
@Bean
public Binding emailBinding(Queue emailQueue, TopicExchange userExchange) {
return BindingBuilder.bind(emailQueue)
.to(userExchange)
.with("user.created");
}
// Message Converter for JSON
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter());
return template;
}
public static void main(String[] args) {
SpringApplication.run(SpringRabbitMQApplication.class, args);
}
}
// Message Producer
@Component
class UserEventProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendUserCreatedEvent(User user) {
rabbitTemplate.convertAndSend("user.exchange", "user.created", user);
System.out.println("Sent user created event: " + user.getEmail());
}
public void sendUserUpdatedEvent(User user) {
rabbitTemplate.convertAndSend("user.exchange", "user.updated", user);
System.out.println("Sent user updated event: " + user.getEmail());
}
}
// Message Consumers
@Component
class UserEventConsumer {
@RabbitListener(queues = "user.queue")
public void handleUserEvent(User user, @Header("amqp_receivedRoutingKey") String routingKey) {
System.out.println("Received user event [" + routingKey + "]: " + user.getEmail());
// Process user event
if ("user.created".equals(routingKey)) {
processUserCreation(user);
} else if ("user.updated".equals(routingKey)) {
processUserUpdate(user);
}
}
private void processUserCreation(User user) {
System.out.println("Processing new user: " + user.getEmail());
}
private void processUserUpdate(User user) {
System.out.println("Processing user update: " + user.getEmail());
}
}
@Component
class EmailServiceConsumer {
@RabbitListener(queues = "email.queue")
public void handleNewUserEmail(User user) {
System.out.println("Sending welcome email to: " + user.getEmail());
// Send welcome email logic
}
}
// DTO Class
class User {
private String email;
private String name;
// Constructors, getters, setters
public User() {}
public User(String email, String name) {
this.email = email;
this.name = name;
}
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
}
// Test Component
@Component
class TestMessageSender {
@Autowired
private UserEventProducer userEventProducer;
@Scheduled(fixedDelay = 5000)
public void sendTestMessages() {
User user = new User("[email protected]", "Test User");
userEventProducer.sendUserCreatedEvent(user);
}
}

Best Practices and Patterns

1. Connection and Channel Management

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionManagement {
private static class RabbitMQManager {
private Connection connection;
private Channel channel;
private final Object lock = new Object();
public Channel getChannel() throws IOException, TimeoutException {
synchronized (lock) {
if (connection == null || !connection.isOpen()) {
connection = RabbitMQConnection.getConnection();
// Setup connection recovery
connection.addShutdownListener(cause -> {
System.out.println("Connection shutdown: " + cause.getMessage());
// Implement reconnection logic
});
}
if (channel == null || !channel.isOpen()) {
channel = connection.createChannel();
// Setup channel recovery
channel.addShutdownListener(cause -> {
System.out.println("Channel shutdown: " + cause.getMessage());
});
}
return channel;
}
}
public void close() throws IOException {
synchronized (lock) {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
}
}
}
}

2. Error Handling and Retry

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class ErrorHandlingExample {
private static final String MAIN_QUEUE = "main.queue";
private static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
private static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
public static void setupDeadLetterQueue() throws IOException, TimeoutException {
try (Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel()) {
// Dead letter exchange
channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DEAD_LETTER_QUEUE, true, false, false, null);
channel.queueBind(DEAD_LETTER_QUEUE, DEAD_LETTER_EXCHANGE, "");
// Main queue with dead letter configuration
java.util.Map<String, Object> args = new java.util.HashMap<>();
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
args.put("x-max-length", 10000); // Limit queue size
channel.queueDeclare(MAIN_QUEUE, true, false, false, args);
}
}
public static class ResilientConsumer {
private static final int MAX_RETRIES = 3;
public void startConsuming() throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
setupDeadLetterQueue();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
AMQP.BasicProperties properties = delivery.getProperties();
try {
processMessageWithRetry(message, properties);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
System.out.println("Message processing failed after retries: " + e.getMessage());
// Send to dead letter queue
channel.basicPublish(DEAD_LETTER_EXCHANGE, "", properties, message.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(MAIN_QUEUE, false, deliverCallback, consumerTag -> {});
}
private void processMessageWithRetry(String message, AMQP.BasicProperties properties) 
throws Exception {
int attempt = 0;
while (attempt < MAX_RETRIES) {
try {
processMessage(message);
return; // Success
} catch (Exception e) {
attempt++;
if (attempt == MAX_RETRIES) {
throw e; // Final failure
}
System.out.println("Attempt " + attempt + " failed, retrying...");
TimeUnit.SECONDS.sleep(attempt * 2); // Exponential backoff
}
}
}
private void processMessage(String message) throws Exception {
// Simulate processing that might fail
if (Math.random() < 0.3) {
throw new Exception("Random processing failure");
}
System.out.println("Successfully processed: " + message);
}
}
}

Monitoring and Management

Connection Monitoring

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MonitoringExample {
public static class MonitoredConnection {
private Connection connection;
private Channel channel;
public void startWithMonitoring() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
// Add monitoring listeners
connection.addShutdownListener(cause -> {
if (!cause.isInitiatedByApplication()) {
System.err.println("Unexpected connection shutdown: " + cause.getMessage());
// Implement reconnection logic
attemptReconnection();
}
});
connection.addBlockedListener(new BlockedListener() {
@Override
public void handleBlocked(String reason) {
System.out.println("Connection blocked: " + reason);
// Reduce message production rate
}
@Override
public void handleUnblocked() {
System.out.println("Connection unblocked");
// Resume normal production rate
}
});
}
private void attemptReconnection() {
// Implement exponential backoff reconnection logic
System.out.println("Attempting reconnection...");
}
public void close() throws IOException {
if (channel != null) channel.close();
if (connection != null) connection.close();
}
}
}

Conclusion

RabbitMQ provides powerful messaging capabilities for building distributed, scalable applications. Key takeaways:

  1. Choose the Right Exchange Type: Direct for point-to-point, Fanout for broadcast, Topic for pattern-based routing
  2. Ensure Message Reliability: Use publisher confirms, persistent messages, and manual acknowledgments
  3. Handle Failures Gracefully: Implement dead letter queues and retry mechanisms
  4. Monitor Connections: Use shutdown and blocked connection listeners
  5. Use Spring AMQP: For easier configuration and integration with Spring applications
  6. Consider Performance: Use appropriate QoS settings and connection pooling

RabbitMQ's flexibility and robustness make it an excellent choice for enterprise messaging scenarios, from simple task queues to complex event-driven architectures.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper