ActiveMQ JMS Implementation in Java

Introduction

ActiveMQ is an open-source message broker that implements the Java Message Service (JMS) API. It supports multiple protocols and provides reliable messaging, high availability, and enterprise-level features for building distributed, asynchronous systems.

Core JMS Concepts

JMS Messaging Models

public class JmsCoreConcepts {
// Point-to-Point (Queue) - One-to-one messaging
// Publish-Subscribe (Topic) - One-to-many messaging
// JMS Message Types:
// - TextMessage
// - ObjectMessage  
// - BytesMessage
// - MapMessage
// - StreamMessage
}

ActiveMQ Setup and Configuration

Maven Dependencies

<dependencies>
<!-- ActiveMQ Client -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.17.4</version>
</dependency>
<!-- Spring JMS for easier integration -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.3.23</version>
</dependency>
<!-- Connection pooling -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.17.4</version>
</dependency>
</dependencies>

ActiveMQ Connection Configuration

public class ActiveMQConfig {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String USERNAME = "admin";
private static final String PASSWORD = "password";
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = 
new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
// Configure connection factory
connectionFactory.setTrustAllPackages(true); // For object messages
connectionFactory.setUseAsyncSend(true);     // Async sending
connectionFactory.setAlwaysSyncSend(false);  // Don't always use sync
return connectionFactory;
}
public ConnectionFactory pooledConnectionFactory() {
PooledConnectionFactory pooledFactory = 
new PooledConnectionFactory(connectionFactory());
pooledFactory.setMaxConnections(10);
pooledFactory.setMaximumActiveSessionPerConnection(50);
return pooledFactory;
}
}
// Spring-based configuration
@Configuration
@EnableJms
public class JmsConfig {
@Value("${activemq.broker-url:tcp://localhost:61616}")
private String brokerUrl;
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = 
new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerUrl);
connectionFactory.setTrustAllPackages(true);
return new CachingConnectionFactory(connectionFactory);
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
jmsTemplate.setDeliveryPersistent(true);
jmsTemplate.setExplicitQosEnabled(true);
return jmsTemplate;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = 
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("3-10"); // Min and max consumers
factory.setSessionTransacted(true);
factory.setErrorHandler(t -> {
System.err.println("JMS Error occurred: " + t.getMessage());
});
return factory;
}
}

Queue-Based Messaging (Point-to-Point)

Message Producer

@Component
public class QueueMessageProducer {
private static final String ORDER_QUEUE = "ORDER.QUEUE";
private static final String NOTIFICATION_QUEUE = "NOTIFICATION.QUEUE";
private final JmsTemplate jmsTemplate;
public QueueMessageProducer(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
// Send simple text message
public void sendOrderMessage(String orderId, String message) {
jmsTemplate.send(ORDER_QUEUE, session -> {
TextMessage textMessage = session.createTextMessage(message);
textMessage.setStringProperty("orderId", orderId);
textMessage.setJMSCorrelationID(orderId);
textMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
return textMessage;
});
System.out.println("Sent order message: " + orderId);
}
// Send object message
public void sendOrderObject(Order order) {
jmsTemplate.convertAndSend(ORDER_QUEUE, order, message -> {
message.setStringProperty("orderId", order.getId());
message.setJMSCorrelationID(order.getId());
message.setJMSType("Order");
return message;
});
}
// Send with custom headers and properties
public void sendMessageWithProperties(String queueName, String message, 
Map<String, Object> properties) {
jmsTemplate.send(queueName, session -> {
TextMessage textMessage = session.createTextMessage(message);
// Set custom properties
properties.forEach((key, value) -> {
if (value instanceof String) {
textMessage.setStringProperty(key, (String) value);
} else if (value instanceof Integer) {
textMessage.setIntProperty(key, (Integer) value);
} else if (value instanceof Boolean) {
textMessage.setBooleanProperty(key, (Boolean) value);
}
});
// Set JMS properties
textMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
textMessage.setJMSPriority(4); // Medium priority
return textMessage;
});
}
// Request-Reply pattern
public String sendAndReceive(String message, long timeout) {
return (String) jmsTemplate.sendAndReceive(ORDER_QUEUE, session -> {
TextMessage textMessage = session.createTextMessage(message);
// Set reply-to destination
TemporaryQueue replyQueue = session.createTemporaryQueue();
textMessage.setJMSReplyTo(replyQueue);
return textMessage;
});
}
}
// Order entity
class Order implements Serializable {
private String id;
private String customerId;
private BigDecimal amount;
private Date orderDate;
private List<OrderItem> items;
// Constructors, getters, setters
public Order() {}
public Order(String id, String customerId, BigDecimal amount) {
this.id = id;
this.customerId = customerId;
this.amount = amount;
this.orderDate = new Date();
this.items = new ArrayList<>();
}
// Getters and setters...
}

Message Consumer

@Component
public class QueueMessageConsumer {
private static final String ORDER_QUEUE = "ORDER.QUEUE";
private static final String NOTIFICATION_QUEUE = "NOTIFICATION.QUEUE";
// Message listener using JMS annotation
@JmsListener(destination = ORDER_QUEUE)
public void receiveOrderMessage(TextMessage message) throws JMSException {
try {
String orderId = message.getStringProperty("orderId");
String text = message.getText();
System.out.println("Received order message - ID: " + orderId + 
", Content: " + text);
// Process the message
processOrder(orderId, text);
// Message will be automatically acknowledged if no exception
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
throw new JMSException("Processing failed");
}
}
// Object message listener
@JmsListener(destination = ORDER_QUEUE)
public void receiveOrderObject(Order order, Message message) throws JMSException {
String orderId = message.getStringProperty("orderId");
System.out.println("Received order object - ID: " + orderId + 
", Amount: " + order.getAmount());
processOrderObject(order);
}
// Manual acknowledgment
@JmsListener(destination = NOTIFICATION_QUEUE)
public void receiveWithManualAck(TextMessage message, 
Session session) throws JMSException {
try {
String content = message.getText();
System.out.println("Processing notification: " + content);
// Business logic
processNotification(content);
// Manual acknowledgment
message.acknowledge();
} catch (Exception e) {
System.err.println("Failed to process notification: " + e.getMessage());
session.recover(); // Redeliver message
}
}
// Multiple queues with selector
@JmsListener(destination = "GENERAL.QUEUE", 
selector = "messageType = 'URGENT'")
public void receiveUrgentMessages(TextMessage message) throws JMSException {
System.out.println("Processing URGENT message: " + message.getText());
processUrgentMessage(message);
}
private void processOrder(String orderId, String orderDetails) {
// Business logic for processing order
System.out.println("Processing order: " + orderId);
// Simulate processing time
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void processOrderObject(Order order) {
// Process order object
System.out.println("Processing order for customer: " + order.getCustomerId());
}
private void processNotification(String notification) {
// Process notification
System.out.println("Notification processed: " + notification);
}
private void processUrgentMessage(TextMessage message) throws JMSException {
// Process urgent messages with higher priority
System.out.println("Urgent message processed: " + message.getText());
}
}

Topic-Based Messaging (Publish-Subscribe)

Topic Publisher

@Component
public class TopicMessagePublisher {
private static final String ORDER_TOPIC = "ORDER.TOPIC";
private static final String NOTIFICATION_TOPIC = "NOTIFICATION.TOPIC";
private final JmsTemplate jmsTemplate;
public TopicMessagePublisher(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void publishOrderEvent(OrderEvent event) {
jmsTemplate.setPubSubDomain(true); // Enable pub-sub domain
jmsTemplate.convertAndSend(ORDER_TOPIC, event, message -> {
message.setStringProperty("eventType", event.getType());
message.setStringProperty("orderId", event.getOrderId());
message.setJMSType("OrderEvent");
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
return message;
});
System.out.println("Published order event: " + event.getType());
}
public void publishNotification(String notification, String category) {
jmsTemplate.setPubSubDomain(true);
jmsTemplate.send(NOTIFICATION_TOPIC, session -> {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("message", notification);
mapMessage.setString("category", category);
mapMessage.setLong("timestamp", System.currentTimeMillis());
mapMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); // Fast, non-persistent
return mapMessage;
});
}
// Durable subscriber example
public void sendToDurableSubscriber(String message, String clientId) {
jmsTemplate.setPubSubDomain(true);
jmsTemplate.send("STOCK.TOPIC", session -> {
TextMessage textMessage = session.createTextMessage(message);
textMessage.setStringProperty("clientId", clientId);
return textMessage;
});
}
}
// Event class for topic messages
class OrderEvent implements Serializable {
private String orderId;
private String type; // CREATED, UPDATED, CANCELLED, COMPLETED
private Date timestamp;
private Map<String, Object> data;
public OrderEvent(String orderId, String type) {
this.orderId = orderId;
this.type = type;
this.timestamp = new Date();
this.data = new HashMap<>();
}
// Getters and setters...
public String getOrderId() { return orderId; }
public String getType() { return type; }
public Date getTimestamp() { return timestamp; }
public Map<String, Object> getData() { return data; }
}

Topic Subscriber

@Component
public class TopicMessageSubscriber {
// Regular topic subscriber
@JmsListener(destination = "ORDER.TOPIC", containerFactory = "jmsListenerContainerFactory")
public void subscribeToOrderEvents(OrderEvent event, Message message) throws JMSException {
String eventType = message.getStringProperty("eventType");
System.out.println("Received order event - Type: " + eventType + 
", Order: " + event.getOrderId());
// Process based on event type
switch (eventType) {
case "CREATED":
handleOrderCreated(event);
break;
case "UPDATED":
handleOrderUpdated(event);
break;
case "CANCELLED":
handleOrderCancelled(event);
break;
default:
System.out.println("Unknown event type: " + eventType);
}
}
// Multiple subscribers for the same topic
@JmsListener(destination = "ORDER.TOPIC", containerFactory = "jmsListenerContainerFactory")
public void auditOrderEvents(OrderEvent event) {
System.out.println("AUDIT - Order event: " + event.getType() + 
" for order: " + event.getOrderId());
// Log to audit system
}
// Durable subscriber
@JmsListener(destination = "STOCK.TOPIC", subscription = "stock-monitor", 
containerFactory = "jmsListenerContainerFactory")
public void durableStockSubscriber(TextMessage message) throws JMSException {
String stockInfo = message.getText();
String clientId = message.getStringProperty("clientId");
System.out.println("Durable subscriber received stock update: " + stockInfo);
processStockUpdate(stockInfo, clientId);
}
// Selective subscriber with message selector
@JmsListener(destination = "NOTIFICATION.TOPIC", 
selector = "category = 'ALERT'",
containerFactory = "jmsListenerContainerFactory")
public void receiveAlerts(MapMessage message) throws JMSException {
String alertMessage = message.getString("message");
String category = message.getString("category");
System.out.println("ALERT received: " + alertMessage);
handleAlert(alertMessage);
}
private void handleOrderCreated(OrderEvent event) {
System.out.println("Processing order creation: " + event.getOrderId());
// Send welcome email, initialize tracking, etc.
}
private void handleOrderUpdated(OrderEvent event) {
System.out.println("Processing order update: " + event.getOrderId());
// Update inventory, notify customer, etc.
}
private void handleOrderCancelled(OrderEvent event) {
System.out.println("Processing order cancellation: " + event.getOrderId());
// Restock items, notify departments, etc.
}
private void processStockUpdate(String stockInfo, String clientId) {
System.out.println("Processing stock update for client: " + clientId);
// Update client portfolio, check thresholds, etc.
}
private void handleAlert(String alertMessage) {
System.out.println("Handling alert: " + alertMessage);
// Critical alert processing
}
}

Advanced ActiveMQ Features

Message Filtering with Selectors

@Component
public class AdvancedMessageProcessor {
// High priority orders
@JmsListener(destination = "ORDER.QUEUE", 
selector = "priority > 5 AND region = 'EUROPE'")
public void processHighPriorityEuropeanOrders(Order order) {
System.out.println("Processing high priority EU order: " + order.getId());
expediteOrderProcessing(order);
}
// Orders with specific criteria
@JmsListener(destination = "ORDER.QUEUE",
selector = "amount > 1000 AND customerTier = 'PREMIUM'")
public void processLargePremiumOrders(Order order) {
System.out.println("Processing large premium order: " + order.getId());
applyPremiumBenefits(order);
}
// Time-based filtering
@JmsListener(destination = "ORDER.QUEUE",
selector = "JMSTimestamp > " + (System.currentTimeMillis() - 3600000))
public void processRecentOrders(Order order) {
System.out.println("Processing recent order: " + order.getId());
// Process orders from last hour
}
private void expediteOrderProcessing(Order order) {
// Special processing for high priority orders
System.out.println("Expediting order: " + order.getId());
}
private void applyPremiumBenefits(Order order) {
// Apply premium customer benefits
System.out.println("Applying premium benefits to order: " + order.getId());
}
}

Transaction Management

@Service
@Transactional
public class TransactionalMessageService {
private final JmsTemplate jmsTemplate;
private final OrderRepository orderRepository;
public TransactionalMessageService(JmsTemplate jmsTemplate, 
OrderRepository orderRepository) {
this.jmsTemplate = jmsTemplate;
this.orderRepository = orderRepository;
}
@Transactional(rollbackFor = Exception.class)
public void processOrderTransactionally(Order order) {
try {
// Save to database
orderRepository.save(order);
// Send message within transaction
jmsTemplate.convertAndSend("ORDER.QUEUE", order, message -> {
message.setStringProperty("orderId", order.getId());
message.setJMSType("Order");
return message;
});
// Send notification
jmsTemplate.convertAndSend("NOTIFICATION.QUEUE", 
"Order " + order.getId() + " created");
// If any exception occurs, both DB save and messages will roll back
} catch (Exception e) {
System.err.println("Transaction failed: " + e.getMessage());
throw e;
}
}
// Chained transaction with multiple resources
@JmsListener(destination = "PAYMENT.QUEUE")
@Transactional
public void processPaymentAndUpdateOrder(TextMessage message) throws JMSException {
String orderId = message.getStringProperty("orderId");
String paymentStatus = message.getText();
// Update order status in database
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new RuntimeException("Order not found"));
order.setStatus(paymentStatus.equals("SUCCESS") ? "PAID" : "PAYMENT_FAILED");
orderRepository.save(order);
// Send notification based on payment result
if (paymentStatus.equals("SUCCESS")) {
jmsTemplate.convertAndSend("NOTIFICATION.QUEUE", 
"Payment successful for order: " + orderId);
} else {
jmsTemplate.convertAndSend("NOTIFICATION.QUEUE", 
"Payment failed for order: " + orderId);
}
}
}

Error Handling and Retry Mechanisms

@Component
public class ErrorHandlingMessageProcessor {
private static final String DEAD_LETTER_QUEUE = "DLQ.ORDER.QUEUE";
private static final int MAX_RETRY_ATTEMPTS = 3;
@JmsListener(destination = "ORDER.QUEUE")
public void processWithErrorHandling(Order order, Message message) throws JMSException {
try {
System.out.println("Processing order: " + order.getId());
// Simulate business logic that might fail
processOrderBusinessLogic(order);
} catch (BusinessException e) {
handleProcessingError(order, message, e);
} catch (Exception e) {
System.err.println("Unexpected error processing order: " + e.getMessage());
moveToDeadLetterQueue(order, message, e.getMessage());
}
}
private void processOrderBusinessLogic(Order order) throws BusinessException {
// Complex business logic that might throw exceptions
if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
throw new BusinessException("Invalid order amount");
}
if (order.getCustomerId() == null || order.getCustomerId().trim().isEmpty()) {
throw new BusinessException("Missing customer ID");
}
// Simulate processing
System.out.println("Order processed successfully: " + order.getId());
}
private void handleProcessingError(Order order, Message message, BusinessException e) 
throws JMSException {
int retryCount = message.getIntProperty("JMSXDeliveryCount") - 1;
if (retryCount < MAX_RETRY_ATTEMPTS) {
System.out.println("Retry attempt " + (retryCount + 1) + 
" for order: " + order.getId());
// Message will be redelivered for retry
throw new JMSException("Retry needed: " + e.getMessage());
} else {
System.err.println("Max retries exceeded for order: " + order.getId());
moveToDeadLetterQueue(order, message, e.getMessage());
}
}
private void moveToDeadLetterQueue(Order order, Message originalMessage, String error) {
// In a real scenario, you would send to DLQ
System.err.println("Moving order " + order.getId() + " to DLQ. Error: " + error);
// jmsTemplate.convertAndSend(DEAD_LETTER_QUEUE, order, message -> {
//     message.setStringProperty("originalMessageId", originalMessage.getJMSMessageID());
//     message.setStringProperty("errorReason", error);
//     message.setLongProperty("failedTimestamp", System.currentTimeMillis());
//     return message;
// });
}
}
class BusinessException extends Exception {
public BusinessException(String message) {
super(message);
}
}

Monitoring and Management

Message Monitoring Utility

@Component
public class JmsMonitor {
private final ConnectionFactory connectionFactory;
public JmsMonitor(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public void monitorQueueStats(String queueName) throws JMSException {
try (Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Queue queue = session.createQueue(queueName);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> messages = browser.getEnumeration();
int messageCount = 0;
while (messages.hasMoreElements()) {
messages.nextElement();
messageCount++;
}
System.out.println("Queue: " + queueName + " has " + messageCount + " pending messages");
} catch (JMSException e) {
System.err.println("Error monitoring queue: " + e.getMessage());
throw e;
}
}
public void checkBrokerStatus() {
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
System.out.println("ActiveMQ broker is connected and running");
} catch (JMSException e) {
System.err.println("Cannot connect to ActiveMQ broker: " + e.getMessage());
}
}
}
// Performance metrics
@Component
public class JmsMetrics {
private final MeterRegistry meterRegistry;
private final Counter messagesSent;
private final Counter messagesReceived;
private final Timer processingTimer;
public JmsMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.messagesSent = Counter.builder("jms.messages.sent")
.description("Total JMS messages sent")
.register(meterRegistry);
this.messagesReceived = Counter.builder("jms.messages.received")
.description("Total JMS messages received")
.register(meterRegistry);
this.processingTimer = Timer.builder("jms.message.processing.time")
.description("JMS message processing time")
.register(meterRegistry);
}
public void recordMessageSent() {
messagesSent.increment();
}
public void recordMessageReceived() {
messagesReceived.increment();
}
public void recordProcessingTime(long duration) {
processingTimer.record(duration, TimeUnit.MILLISECONDS);
}
}

Testing JMS Applications

JMS Test Configuration

@SpringBootTest
@ActiveProfiles("test")
public class JmsTestConfig {
@Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
}
@Bean
public JmsTemplate jmsTemplate() {
return new JmsTemplate(connectionFactory());
}
}
// Integration tests
@SpringBootTest
class ActiveMQIntegrationTest {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private QueueMessageProducer messageProducer;
@Test
void testSendAndReceiveMessage() throws JMSException {
// Given
String testMessage = "Test Message";
String queueName = "TEST.QUEUE";
// When
messageProducer.sendOrderMessage("test123", testMessage);
// Then
Message receivedMessage = jmsTemplate.receive(queueName);
assertThat(receivedMessage).isInstanceOf(TextMessage.class);
assertThat(((TextMessage) receivedMessage).getText()).isEqualTo(testMessage);
}
@Test
void testOrderObjectSerialization() {
// Given
Order order = new Order("test123", "customer456", new BigDecimal("99.99"));
// When
messageProducer.sendOrderObject(order);
// Then
Order receivedOrder = (Order) jmsTemplate.receiveAndConvert("ORDER.QUEUE");
assertThat(receivedOrder.getId()).isEqualTo("test123");
assertThat(receivedOrder.getCustomerId()).isEqualTo("customer456");
}
}
// Mock-based unit tests
@ExtendWith(MockitoExtension.class)
class MessageProcessorTest {
@Mock
private JmsTemplate jmsTemplate;
@InjectMocks
private QueueMessageProducer messageProducer;
@Test
void testMessageSending() {
// When
messageProducer.sendOrderMessage("123", "Test Order");
// Then
verify(jmsTemplate).send(eq("ORDER.QUEUE"), any(MessageCreator.class));
}
}

Best Practices and Patterns

Production-Ready Implementation

@Component
public class ProductionReadyJmsService {
private final JmsTemplate jmsTemplate;
private final JmsMetrics metrics;
public ProductionReadyJmsService(JmsTemplate jmsTemplate, JmsMetrics metrics) {
this.jmsTemplate = jmsTemplate;
this.metrics = metrics;
}
// Async message sending with completion callback
public CompletableFuture<String> sendOrderAsync(Order order) {
return CompletableFuture.supplyAsync(() -> {
long startTime = System.currentTimeMillis();
try {
jmsTemplate.convertAndSend("ORDER.QUEUE", order, message -> {
message.setStringProperty("orderId", order.getId());
message.setJMSCorrelationID(order.getId());
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
message.setJMSPriority(4);
message.setJMSExpiration(300000); // 5 minutes TTL
return message;
});
metrics.recordMessageSent();
return "Order sent successfully: " + order.getId();
} catch (Exception e) {
metrics.recordMessageSent(); // Still count attempted sends
throw new RuntimeException("Failed to send order message", e);
} finally {
long duration = System.currentTimeMillis() - startTime;
metrics.recordProcessingTime(duration);
}
});
}
// Batch message processing
public void processMessageBatch(List<Order> orders) {
jmsTemplate.execute(session -> {
MessageProducer producer = session.createProducer(
session.createQueue("ORDER.QUEUE"));
try {
for (Order order : orders) {
ObjectMessage message = session.createObjectMessage(order);
message.setStringProperty("orderId", order.getId());
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(message);
metrics.recordMessageSent();
}
return null;
} finally {
producer.close();
}
});
}
// Connection resilience with retry
@Retryable(value = JMSException.class, maxAttempts = 3, 
backoff = @Backoff(delay = 1000, multiplier = 2))
public void sendWithRetry(Order order) throws JMSException {
jmsTemplate.convertAndSend("ORDER.QUEUE", order);
metrics.recordMessageSent();
}
@Recover
public void sendRecovery(JMSException e, Order order) {
System.err.println("Failed to send order after retries: " + order.getId());
// Log to file, send to alternative system, etc.
}
}
// Configuration for production
@Configuration
public class ProductionJmsConfig {
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = 
new ActiveMQConnectionFactory("failover:(tcp://primary:61616,tcp://secondary:61616)");
// Production settings
connectionFactory.setUseAsyncSend(true);
connectionFactory.setAlwaysSyncSend(false);
connectionFactory.setProducerWindowSize(1024000);
connectionFactory.setUseCompression(true);
connectionFactory.setCopyMessageOnSend(false);
// Redelivery policy
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
redeliveryPolicy.setInitialRedeliveryDelay(1000);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return new PooledConnectionFactory(connectionFactory);
}
}

Conclusion

ActiveMQ JMS implementation provides a robust, enterprise-ready messaging solution with:

  • Reliable messaging with persistence and transactions
  • Flexible messaging models (Queue P2P, Topic Pub-Sub)
  • Advanced features like selectors, TTL, priorities
  • Spring integration for simplified development
  • Production-ready with connection pooling, failover, and monitoring

Key best practices include:

  • Use connection pooling for performance
  • Implement proper error handling and retry mechanisms
  • Monitor queue depths and broker health
  • Use appropriate message persistence and TTL settings
  • Implement comprehensive testing strategies

This implementation enables building scalable, decoupled, and resilient distributed systems using industry-standard JMS messaging patterns.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper