Overview
Azure Service Bus is a fully managed enterprise message broker with message queues and publish-subscribe topics. It provides reliable cloud messaging as a service (MaaS) with support for structured message passing, durability, and transactional delivery.
Architecture
Core Components
- Queues: Point-to-point messaging
- Topics: Publish-subscribe with multiple subscriptions
- Relays: Hybrid connectivity
- Namespaces: Container for all messaging components
Dependencies
<dependencies> <!-- Azure Service Bus --> <dependency> <groupId>com.azure</groupId> <artifactId>azure-messaging-servicebus</artifactId> <version>7.15.0</version> </dependency> <!-- Azure Identity --> <dependency> <groupId>com.azure</groupId> <artifactId>azure-identity</artifactId> <version>1.9.2</version> </dependency> <!-- Spring Boot Starters --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- Configuration --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <!-- JSON Processing --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <!-- Testing --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
Core Implementation
1. Configuration Classes
@Configuration
@ConfigurationProperties(prefix = "azure.servicebus")
@Data
public class ServiceBusProperties {
private String connectionString;
private String namespace;
private String queueName;
private String topicName;
private String subscriptionName;
private int maxConcurrentCalls = 1;
private int prefetchCount = 0;
private Duration maxAutoLockRenewDuration = Duration.ofMinutes(5);
}
@Configuration
@EnableConfigurationProperties(ServiceBusProperties.class)
public class ServiceBusConfig {
@Bean
@Primary
public ServiceBusClientBuilder serviceBusClientBuilder(ServiceBusProperties properties) {
return new ServiceBusClientBuilder()
.connectionString(properties.getConnectionString());
}
@Bean
public ServiceBusSenderClient queueSenderClient(
ServiceBusClientBuilder builder,
ServiceBusProperties properties) {
return builder
.sender()
.queueName(properties.getQueueName())
.buildClient();
}
@Bean
public ServiceBusSenderClient topicSenderClient(
ServiceBusClientBuilder builder,
ServiceBusProperties properties) {
return builder
.sender()
.topicName(properties.getTopicName())
.buildClient();
}
@Bean
public ServiceBusProcessorClient queueProcessorClient(
ServiceBusClientBuilder builder,
ServiceBusProperties properties,
QueueMessageHandler messageHandler) {
return builder
.processor()
.queueName(properties.getQueueName())
.processMessage(messageHandler::processMessage)
.processError(context -> messageHandler.processError(context))
.maxConcurrentCalls(properties.getMaxConcurrentCalls())
.prefetchCount(properties.getPrefetchCount())
.maxAutoLockRenewDuration(properties.getMaxAutoLockRenewDuration())
.buildProcessorClient();
}
@Bean
public ServiceBusProcessorClient topicProcessorClient(
ServiceBusClientBuilder builder,
ServiceBusProperties properties,
TopicMessageHandler messageHandler) {
return builder
.processor()
.topicName(properties.getTopicName())
.subscriptionName(properties.getSubscriptionName())
.processMessage(messageHandler::processMessage)
.processError(context -> messageHandler.processError(context))
.maxConcurrentCalls(properties.getMaxConcurrentCalls())
.prefetchCount(properties.getPrefetchCount())
.buildProcessorClient();
}
}
2. Core Service Bus Service
@Service
@Slf4j
public class ServiceBusService {
private final ServiceBusSenderClient queueSender;
private final ServiceBusSenderClient topicSender;
private final ServiceBusProcessorClient queueProcessor;
private final ServiceBusProcessorClient topicProcessor;
private final ObjectMapper objectMapper;
public ServiceBusService(ServiceBusSenderClient queueSender,
ServiceBusSenderClient topicSender,
ServiceBusProcessorClient queueProcessor,
ServiceBusProcessorClient topicProcessor) {
this.queueSender = queueSender;
this.topicSender = topicSender;
this.queueProcessor = queueProcessor;
this.topicProcessor = topicProcessor;
this.objectMapper = new ObjectMapper();
}
@PostConstruct
public void startProcessors() {
queueProcessor.start();
topicProcessor.start();
log.info("Service Bus processors started");
}
@PreDestroy
public void stopProcessors() {
queueProcessor.close();
topicProcessor.close();
log.info("Service Bus processors stopped");
}
// Queue Operations
public String sendToQueue(Object message) {
return sendToQueue(message, null);
}
public String sendToQueue(Object message, Duration scheduledEnqueueTime) {
try {
String messageBody = objectMapper.writeValueAsString(message);
ServiceBusMessage serviceBusMessage = new ServiceBusMessage(messageBody);
if (scheduledEnqueueTime != null) {
serviceBusMessage.setScheduledEnqueueTime(OffsetDateTime.now().plus(scheduledEnqueueTime));
}
queueSender.sendMessage(serviceBusMessage);
log.debug("Message sent to queue: {}", messageBody);
return serviceBusMessage.getMessageId();
} catch (Exception e) {
throw new ServiceBusException("Failed to send message to queue", e);
}
}
public String sendToQueue(ServiceBusMessage message) {
try {
queueSender.sendMessage(message);
log.debug("Message sent to queue with ID: {}", message.getMessageId());
return message.getMessageId();
} catch (Exception e) {
throw new ServiceBusException("Failed to send message to queue", e);
}
}
// Topic Operations
public String sendToTopic(Object message) {
return sendToTopic(message, null);
}
public String sendToTopic(Object message, Duration scheduledEnqueueTime) {
try {
String messageBody = objectMapper.writeValueAsString(message);
ServiceBusMessage serviceBusMessage = new ServiceBusMessage(messageBody);
if (scheduledEnqueueTime != null) {
serviceBusMessage.setScheduledEnqueueTime(OffsetDateTime.now().plus(scheduledEnqueueTime));
}
topicSender.sendMessage(serviceBusMessage);
log.debug("Message sent to topic: {}", messageBody);
return serviceBusMessage.getMessageId();
} catch (Exception e) {
throw new ServiceBusException("Failed to send message to topic", e);
}
}
// Batch Operations
public void sendBatchToQueue(List<Object> messages) {
try {
ServiceBusMessageBatch batch = queueSender.createMessageBatch();
for (Object message : messages) {
String messageBody = objectMapper.writeValueAsString(message);
ServiceBusMessage serviceBusMessage = new ServiceBusMessage(messageBody);
if (!batch.tryAddMessage(serviceBusMessage)) {
queueSender.sendMessages(batch);
batch = queueSender.createMessageBatch();
batch.tryAddMessage(serviceBusMessage);
}
}
if (batch.getCount() > 0) {
queueSender.sendMessages(batch);
}
log.debug("Batch of {} messages sent to queue", messages.size());
} catch (Exception e) {
throw new ServiceBusException("Failed to send batch to queue", e);
}
}
// Advanced Message Properties
public String sendMessageWithProperties(Object message, Map<String, Object> properties) {
try {
String messageBody = objectMapper.writeValueAsString(message);
ServiceBusMessage serviceBusMessage = new ServiceBusMessage(messageBody);
if (properties != null) {
properties.forEach((key, value) ->
serviceBusMessage.getApplicationProperties().put(key, value));
}
queueSender.sendMessage(serviceBusMessage);
return serviceBusMessage.getMessageId();
} catch (Exception e) {
throw new ServiceBusException("Failed to send message with properties", e);
}
}
// Session-based Messaging
public String sendSessionMessage(Object message, String sessionId) {
try {
String messageBody = objectMapper.writeValueAsString(message);
ServiceBusMessage serviceBusMessage = new ServiceBusMessage(messageBody);
serviceBusMessage.setSessionId(sessionId);
queueSender.sendMessage(serviceBusMessage);
log.debug("Session message sent with session ID: {}", sessionId);
return serviceBusMessage.getMessageId();
} catch (Exception e) {
throw new ServiceBusException("Failed to send session message", e);
}
}
}
3. Message Handlers
@Component
@Slf4j
public class QueueMessageHandler {
private final ObjectMapper objectMapper;
private final MetricsService metricsService;
public QueueMessageHandler(ObjectMapper objectMapper, MetricsService metricsService) {
this.objectMapper = objectMapper;
this.metricsService = metricsService;
}
public void processMessage(ServiceBusReceivedMessageContext context) {
ServiceBusReceivedMessage message = context.getMessage();
try {
String messageBody = message.getBody().toString();
log.info("Processing queue message: {}", messageBody);
// Process message based on content type or application properties
processMessageContent(message);
// Complete the message
context.complete();
metricsService.recordMessageProcessed("queue", true);
} catch (Exception e) {
log.error("Failed to process queue message: {}", message.getMessageId(), e);
// Abandon the message for retry
context.abandon();
metricsService.recordMessageProcessed("queue", false);
}
}
public void processError(ServiceBusErrorContext errorContext) {
log.error("Error occurred in queue processor: {}",
errorContext.getException().getMessage(),
errorContext.getException());
metricsService.recordError("queue", errorContext.getErrorSource());
}
private void processMessageContent(ServiceBusReceivedMessage message) {
String contentType = message.getContentType();
if ("application/json".equals(contentType)) {
processJsonMessage(message);
} else if (message.getApplicationProperties().containsKey("MessageType")) {
String messageType = (String) message.getApplicationProperties().get("MessageType");
processTypedMessage(message, messageType);
} else {
processGenericMessage(message);
}
}
private void processJsonMessage(ServiceBusReceivedMessage message) {
try {
String messageBody = message.getBody().toString();
JsonNode jsonNode = objectMapper.readTree(messageBody);
// Handle different message types
if (jsonNode.has("type")) {
String type = jsonNode.get("type").asText();
switch (type) {
case "OrderCreated":
OrderCreatedEvent orderEvent = objectMapper.treeToValue(jsonNode, OrderCreatedEvent.class);
handleOrderCreated(orderEvent);
break;
case "UserRegistered":
UserRegisteredEvent userEvent = objectMapper.treeToValue(jsonNode, UserRegisteredEvent.class);
handleUserRegistered(userEvent);
break;
default:
log.warn("Unknown message type: {}", type);
}
}
} catch (Exception e) {
throw new MessageProcessingException("Failed to process JSON message", e);
}
}
private void processTypedMessage(ServiceBusReceivedMessage message, String messageType) {
// Process based on message type in application properties
switch (messageType) {
case "Notification":
processNotificationMessage(message);
break;
case "Audit":
processAuditMessage(message);
break;
default:
log.warn("Unknown message type: {}", messageType);
}
}
private void processGenericMessage(ServiceBusReceivedMessage message) {
// Default processing for messages without specific type
log.info("Processing generic message: {}", message.getBody().toString());
}
private void handleOrderCreated(OrderCreatedEvent event) {
log.info("Processing order created event: {}", event.getOrderId());
// Business logic for order creation
}
private void handleUserRegistered(UserRegisteredEvent event) {
log.info("Processing user registered event: {}", event.getUserId());
// Business logic for user registration
}
private void processNotificationMessage(ServiceBusReceivedMessage message) {
// Process notification message
}
private void processAuditMessage(ServiceBusReceivedMessage message) {
// Process audit message
}
}
@Component
@Slf4j
public class TopicMessageHandler {
private final ObjectMapper objectMapper;
public TopicMessageHandler(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
public void processMessage(ServiceBusReceivedMessageContext context) {
ServiceBusReceivedMessage message = context.getMessage();
try {
String messageBody = message.getBody().toString();
String subscription = context.getSessionId(); // Or get from properties
log.info("Processing topic message from subscription {}: {}",
subscription, messageBody);
// Process topic message
processTopicMessage(message, subscription);
context.complete();
} catch (Exception e) {
log.error("Failed to process topic message: {}", message.getMessageId(), e);
context.abandon();
}
}
public void processError(ServiceBusErrorContext errorContext) {
log.error("Error occurred in topic processor: {}",
errorContext.getException().getMessage(),
errorContext.getException());
}
private void processTopicMessage(ServiceBusReceivedMessage message, String subscription) {
// Process based on subscription
switch (subscription) {
case "orders":
processOrderMessage(message);
break;
case "notifications":
processNotificationMessage(message);
break;
case "audit":
processAuditMessage(message);
break;
default:
log.warn("Unknown subscription: {}", subscription);
}
}
private void processOrderMessage(ServiceBusReceivedMessage message) {
// Process order-related messages
}
private void processNotificationMessage(ServiceBusReceivedMessage message) {
// Process notification messages
}
private void processAuditMessage(ServiceBusReceivedMessage message) {
// Process audit messages
}
}
Advanced Features
1. Session-based Processing
@Service
@Slf4j
public class SessionBasedProcessor {
private final ServiceBusClientBuilder clientBuilder;
private final ServiceBusProperties properties;
private final Map<String, ServiceBusSessionProcessor> sessionProcessors;
public SessionBasedProcessor(ServiceBusClientBuilder clientBuilder,
ServiceBusProperties properties) {
this.clientBuilder = clientBuilder;
this.properties = properties;
this.sessionProcessors = new ConcurrentHashMap<>();
}
public void startSessionProcessor(String sessionId) {
if (!sessionProcessors.containsKey(sessionId)) {
ServiceBusSessionProcessor processor = clientBuilder
.sessionProcessor()
.queueName(properties.getQueueName())
.maxConcurrentSessions(1)
.processMessage(this::processSessionMessage)
.processError(this::processSessionError)
.buildSessionProcessor();
processor.start();
sessionProcessors.put(sessionId, processor);
log.info("Started session processor for session: {}", sessionId);
}
}
public void stopSessionProcessor(String sessionId) {
ServiceBusSessionProcessor processor = sessionProcessors.remove(sessionId);
if (processor != null) {
processor.close();
log.info("Stopped session processor for session: {}", sessionId);
}
}
private void processSessionMessage(ServiceBusReceivedMessageContext context) {
ServiceBusReceivedMessage message = context.getMessage();
String sessionId = message.getSessionId();
try {
log.info("Processing session message for session {}: {}",
sessionId, message.getBody().toString());
// Session-specific processing logic
processMessageInSessionContext(message, sessionId);
context.complete();
} catch (Exception e) {
log.error("Failed to process session message for session {}", sessionId, e);
context.abandon();
}
}
private void processSessionError(ServiceBusErrorContext errorContext) {
log.error("Session processor error: {}", errorContext.getException().getMessage());
}
private void processMessageInSessionContext(ServiceBusReceivedMessage message, String sessionId) {
// Implement session-aware processing logic
// Maintain session state, handle sequence, etc.
}
}
2. Dead Letter Queue Management
@Service
@Slf4j
public class DeadLetterQueueService {
private final ServiceBusClientBuilder clientBuilder;
private final ServiceBusProperties properties;
public DeadLetterQueueService(ServiceBusClientBuilder clientBuilder,
ServiceBusProperties properties) {
this.clientBuilder = clientBuilder;
this.properties = properties;
}
public List<ServiceBusReceivedMessage> readDeadLetterMessages(int maxMessages) {
List<ServiceBusReceivedMessage> deadLetterMessages = new ArrayList<>();
try (ServiceBusReceiverClient receiverClient = clientBuilder
.receiver()
.queueName(properties.getQueueName())
.subQueue(SubQueue.DEAD_LETTER_QUEUE)
.buildClient()) {
IterableStream<ServiceBusReceivedMessage> messages =
receiverClient.receiveMessages(maxMessages, Duration.ofSeconds(30));
messages.forEach(message -> {
deadLetterMessages.add(message);
log.info("Retrieved DLQ message: {}, Reason: {}",
message.getMessageId(),
message.getDeadLetterReason());
});
} catch (Exception e) {
log.error("Failed to read DLQ messages", e);
}
return deadLetterMessages;
}
public void resubmitDeadLetterMessage(String messageId) {
try (ServiceBusReceiverClient receiverClient = clientBuilder
.receiver()
.queueName(properties.getQueueName())
.subQueue(SubQueue.DEAD_LETTER_QUEUE)
.buildClient()) {
// This is a simplified approach - in practice, you'd need to track message IDs
// or use a different strategy for message resubmission
} catch (Exception e) {
log.error("Failed to resubmit DLQ message: {}", messageId, e);
}
}
public DeadLetterQueueStatistics getDLQStatistics() {
// This would typically use Azure Management APIs
// For now, return mock statistics
return new DeadLetterQueueStatistics(10, Instant.now().minus(Duration.ofDays(1)));
}
@Scheduled(fixedDelay = 300000) // Every 5 minutes
public void monitorDeadLetterQueue() {
try {
DeadLetterQueueStatistics stats = getDLQStatistics();
if (stats.getMessageCount() > 100) {
log.warn("DLQ has {} messages, investigation required", stats.getMessageCount());
// Trigger alert or automatic processing
}
} catch (Exception e) {
log.error("Failed to monitor DLQ", e);
}
}
}
3. Advanced Message Patterns
@Service
@Slf4j
public class AdvancedMessagingPatterns {
private final ServiceBusSenderClient senderClient;
private final ServiceBusReceiverClient receiverClient;
public AdvancedMessagingPatterns(ServiceBusSenderClient senderClient,
ServiceBusReceiverClient receiverClient) {
this.senderClient = senderClient;
this.receiverClient = receiverClient;
}
// Request-Reply Pattern
public CompletableFuture<String> sendRequestAndWaitForReply(
Object request, String replyTo, Duration timeout) {
String correlationId = UUID.randomUUID().toString();
String messageBody = serializeMessage(request);
ServiceBusMessage requestMessage = new ServiceBusMessage(messageBody);
requestMessage.setMessageId(UUID.randomUUID().toString());
requestMessage.setCorrelationId(correlationId);
requestMessage.setReplyTo(replyTo);
requestMessage.setTimeToLive(timeout);
senderClient.sendMessage(requestMessage);
return waitForReply(correlationId, timeout);
}
private CompletableFuture<String> waitForReply(String correlationId, Duration timeout) {
return CompletableFuture.supplyAsync(() -> {
Instant endTime = Instant.now().plus(timeout);
while (Instant.now().isBefore(endTime)) {
ServiceBusReceivedMessage message = receiverClient.receiveMessage(Duration.ofSeconds(1));
if (message != null && correlationId.equals(message.getCorrelationId())) {
receiverClient.complete(message);
return message.getBody().toString();
}
}
throw new TimeoutException("Reply timeout exceeded");
});
}
// Deferred Message Processing
public void deferMessage(ServiceBusReceivedMessage message, Duration deferDuration) {
try {
receiverClient.defer(message,
Map.of("DeferredUntil", Instant.now().plus(deferDuration).toString()));
} catch (Exception e) {
log.error("Failed to defer message: {}", message.getMessageId(), e);
}
}
public void processDeferredMessages() {
try {
// Receive deferred messages using sequence number
// This is a simplified example
receiverClient.receiveDeferredMessage(1L); // Sequence number
} catch (Exception e) {
log.error("Failed to process deferred messages", e);
}
}
private String serializeMessage(Object message) {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(message);
} catch (JsonProcessingException e) {
throw new MessageSerializationException("Failed to serialize message", e);
}
}
}
Spring Boot Integration
1. REST Controller for Message Operations
@RestController
@RequestMapping("/api/messages")
@Slf4j
public class MessageController {
private final ServiceBusService serviceBusService;
private final AdvancedMessagingPatterns advancedPatterns;
public MessageController(ServiceBusService serviceBusService,
AdvancedMessagingPatterns advancedPatterns) {
this.serviceBusService = serviceBusService;
this.advancedPatterns = advancedPatterns;
}
@PostMapping("/queue")
public ResponseEntity<MessageResponse> sendToQueue(@RequestBody MessageRequest request) {
try {
String messageId = serviceBusService.sendToQueue(request.getPayload());
return ResponseEntity.ok(new MessageResponse(
messageId,
"Message sent to queue successfully",
Instant.now()
));
} catch (Exception e) {
log.error("Failed to send message to queue", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new MessageResponse(null, "Failed to send message: " + e.getMessage(), Instant.now()));
}
}
@PostMapping("/topic")
public ResponseEntity<MessageResponse> sendToTopic(@RequestBody MessageRequest request) {
try {
String messageId = serviceBusService.sendToTopic(request.getPayload());
return ResponseEntity.ok(new MessageResponse(
messageId,
"Message sent to topic successfully",
Instant.now()
));
} catch (Exception e) {
log.error("Failed to send message to topic", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new MessageResponse(null, "Failed to send message: " + e.getMessage(), Instant.now()));
}
}
@PostMapping("/scheduled")
public ResponseEntity<MessageResponse> sendScheduledMessage(
@RequestBody ScheduledMessageRequest request) {
try {
String messageId = serviceBusService.sendToQueue(
request.getPayload(),
request.getDelay()
);
return ResponseEntity.ok(new MessageResponse(
messageId,
"Scheduled message sent successfully",
Instant.now()
));
} catch (Exception e) {
log.error("Failed to send scheduled message", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new MessageResponse(null, "Failed to send scheduled message: " + e.getMessage(), Instant.now()));
}
}
@PostMapping("/batch")
public ResponseEntity<MessageResponse> sendBatch(@RequestBody BatchMessageRequest request) {
try {
serviceBusService.sendBatchToQueue(request.getMessages());
return ResponseEntity.ok(new MessageResponse(
null,
"Batch of " + request.getMessages().size() + " messages sent successfully",
Instant.now()
));
} catch (Exception e) {
log.error("Failed to send batch messages", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new MessageResponse(null, "Failed to send batch messages: " + e.getMessage(), Instant.now()));
}
}
@GetMapping("/dlq")
public ResponseEntity<List<DeadLetterMessage>> getDeadLetterMessages(
@RequestParam(defaultValue = "10") int maxMessages) {
// Implementation would use DeadLetterQueueService
return ResponseEntity.ok(Collections.emptyList());
}
}
2. Configuration Properties
# application.yml azure: servicebus: connection-string: "Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=your-key" namespace: "your-namespace" queue-name: "orders-queue" topic-name: "notifications-topic" subscription-name: "orders-subscription" max-concurrent-calls: 5 prefetch-count: 10 max-auto-lock-renew-duration: PT5M spring: jackson: date-format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z' time-zone: UTC
Testing
1. Unit Tests
@SpringBootTest
@TestPropertySource(properties = {
"azure.servicebus.connection-string=Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=test;SharedAccessKey=test",
"azure.servicebus.queue-name=test-queue"
})
class ServiceBusServiceTest {
@Autowired
private ServiceBusService serviceBusService;
@MockBean
private ServiceBusSenderClient queueSender;
@MockBean
private ServiceBusSenderClient topicSender;
@Test
void testSendToQueue() {
// Given
TestMessage message = new TestMessage("test", "content");
// When
String messageId = serviceBusService.sendToQueue(message);
// Then
assertNotNull(messageId);
}
@Test
void testSendToQueueWithScheduledTime() {
// Given
TestMessage message = new TestMessage("test", "content");
Duration delay = Duration.ofMinutes(5);
// When
String messageId = serviceBusService.sendToQueue(message, delay);
// Then
assertNotNull(messageId);
}
@Test
void testSendBatchToQueue() {
// Given
List<TestMessage> messages = Arrays.asList(
new TestMessage("1", "content1"),
new TestMessage("2", "content2"),
new TestMessage("3", "content3")
);
// When/Then - Should not throw exception
assertDoesNotThrow(() -> serviceBusService.sendBatchToQueue(messages));
}
}
@Component
class TestMessageHandler {
private final List<String> processedMessages = new ArrayList<>();
public void processMessage(ServiceBusReceivedMessageContext context) {
String messageBody = context.getMessage().getBody().toString();
processedMessages.add(messageBody);
context.complete();
}
public List<String> getProcessedMessages() {
return processedMessages;
}
public void clear() {
processedMessages.clear();
}
}
Best Practices
- Use Connection Pooling: Reuse Service Bus clients
- Handle Exceptions Gracefully: Implement proper error handling
- Monitor Message Queues: Use Azure Monitor for insights
- Implement Retry Policies: Use exponential backoff for retries
- Use Sessions for Ordered Processing: When message order matters
- Monitor Dead Letter Queues: Regularly check and process DLQ messages
- Use Appropriate Timeouts: Set reasonable timeouts for operations
- Implement Circuit Breaker: For resilient message processing
This implementation provides a comprehensive foundation for using Azure Service Bus in Java applications, covering basic messaging, advanced patterns, Spring Boot integration, and testing strategies.