Pub/Sub Messaging in Java: Comprehensive Guide to Publish-Subscribe Patterns

Pub/Sub (Publish-Subscribe) messaging is a powerful pattern that enables loose coupling between components in distributed systems. Publishers send messages without knowing about subscribers, and subscribers receive messages without knowing about publishers.

Core Pub/Sub Concepts

Key Components

  • Publisher: Sends messages to topics/channels
  • Subscriber: Receives messages from topics/channels
  • Topic/Channel: Logical channel for message distribution
  • Message Broker: Middleware that handles message routing

Implementation Approaches

1. Redis Pub/Sub

Dependencies

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.4.0</version>
</dependency>

Example 1: Basic Redis Pub/Sub

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class RedisPubSubExample {
public static class MessageHandler extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.println("Received message: " + message + " from channel: " + channel);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("Subscribed to channel: " + channel);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println("Unsubscribed from channel: " + channel);
}
}
public static void main(String[] args) {
// Publisher
new Thread(() -> {
try (Jedis publisher = new Jedis("localhost", 6379)) {
for (int i = 1; i <= 5; i++) {
String message = "Message " + i;
publisher.publish("news", message);
System.out.println("Published: " + message);
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// Subscriber
new Thread(() -> {
try (Jedis subscriber = new Jedis("localhost", 6379)) {
MessageHandler handler = new MessageHandler();
subscriber.subscribe(handler, "news");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}

Example 2: Advanced Redis Pub/Sub with Connection Pool

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import java.time.Duration;
public class RedisPubSubAdvanced {
private final JedisPool jedisPool;
public RedisPubSubAdvanced() {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(10);
poolConfig.setMaxIdle(5);
poolConfig.setMinIdle(1);
poolConfig.setMaxWait(Duration.ofSeconds(30));
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
this.jedisPool = new JedisPool(poolConfig, "localhost", 6379);
}
public static class AdvancedMessageHandler extends JedisPubSub {
private final String subscriberId;
public AdvancedMessageHandler(String subscriberId) {
this.subscriberId = subscriberId;
}
@Override
public void onMessage(String channel, String message) {
System.out.printf("[%s] Received on channel '%s': %s%n", 
subscriberId, channel, message);
// Process message based on channel
switch (channel) {
case "orders":
processOrderMessage(message);
break;
case "notifications":
processNotificationMessage(message);
break;
case "alerts":
processAlertMessage(message);
break;
default:
System.out.println("Unknown channel: " + channel);
}
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.printf("[%s] Subscribed to channel: %s (total: %d)%n", 
subscriberId, channel, subscribedChannels);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.printf("[%s] Unsubscribed from channel: %s (remaining: %d)%n", 
subscriberId, channel, subscribedChannels);
}
@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.printf("[%s] Pattern match: pattern=%s, channel=%s, message=%s%n",
subscriberId, pattern, channel, message);
}
private void processOrderMessage(String message) {
System.out.println("Processing order: " + message);
// Order processing logic
}
private void processNotificationMessage(String message) {
System.out.println("Sending notification: " + message);
// Notification logic
}
private void processAlertMessage(String message) {
System.out.println("Handling alert: " + message);
// Alert handling logic
}
}
public void publish(String channel, String message) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.publish(channel, message);
System.out.println("Published to " + channel + ": " + message);
}
}
public void subscribe(String... channels) {
new Thread(() -> {
try (Jedis jedis = jedisPool.getResource()) {
AdvancedMessageHandler handler = new AdvancedMessageHandler("Subscriber-1");
jedis.subscribe(handler, channels);
} catch (Exception e) {
System.err.println("Subscription error: " + e.getMessage());
}
}).start();
}
public void patternSubscribe(String... patterns) {
new Thread(() -> {
try (Jedis jedis = jedisPool.getResource()) {
AdvancedMessageHandler handler = new AdvancedMessageHandler("Pattern-Subscriber");
jedis.psubscribe(handler, patterns);
} catch (Exception e) {
System.err.println("Pattern subscription error: " + e.getMessage());
}
}).start();
}
public static void main(String[] args) throws InterruptedException {
RedisPubSubAdvanced pubSub = new RedisPubSubAdvanced();
// Start subscribers
pubSub.subscribe("orders", "notifications");
pubSub.patternSubscribe("alert:*", "log:*");
Thread.sleep(1000);
// Publish messages
pubSub.publish("orders", "New order: ORDER-123");
pubSub.publish("notifications", "User registered: USER-456");
pubSub.publish("alerts:high", "High priority alert!");
pubSub.publish("alerts:medium", "Medium priority alert");
pubSub.publish("logs:app", "Application log entry");
Thread.sleep(5000);
pubSub.jedisPool.close();
}
}

2. Spring Framework Pub/Sub

Dependencies

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

Example 3: Spring Redis Pub/Sub

// Configuration
@Configuration
@EnableRedisRepositories
public class RedisConfig {
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory(new RedisStandaloneConfiguration("localhost", 6379));
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory());
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
@Bean
public ChannelTopic orderTopic() {
return new ChannelTopic("orders");
}
@Bean
public ChannelTopic notificationTopic() {
return new ChannelTopic("notifications");
}
@Bean
public RedisMessageListenerContainer redisContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory());
return container;
}
}
// Message Publisher
@Component
public class RedisMessagePublisher {
private final RedisTemplate<String, Object> redisTemplate;
private final ChannelTopic orderTopic;
private final ChannelTopic notificationTopic;
public RedisMessagePublisher(RedisTemplate<String, Object> redisTemplate,
ChannelTopic orderTopic,
ChannelTopic notificationTopic) {
this.redisTemplate = redisTemplate;
this.orderTopic = orderTopic;
this.notificationTopic = notificationTopic;
}
public void publishOrder(OrderEvent order) {
redisTemplate.convertAndSend(orderTopic.getTopic(), order);
}
public void publishNotification(NotificationEvent notification) {
redisTemplate.convertAndSend(notificationTopic.getTopic(), notification);
}
public void publishToTopic(String topic, Object message) {
redisTemplate.convertAndSend(topic, message);
}
}
// Message Subscriber
@Component
@Slf4j
public class RedisMessageSubscriber implements MessageListener {
private final List<String> messageList = new ArrayList<>();
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel());
String body = new String(message.getBody());
log.info("Received message from channel {}: {}", channel, body);
messageList.add(body);
// Process based on channel
processMessage(channel, body);
}
private void processMessage(String channel, String message) {
switch (channel) {
case "orders":
processOrderMessage(message);
break;
case "notifications":
processNotificationMessage(message);
break;
default:
log.warn("Unknown channel: {}", channel);
}
}
private void processOrderMessage(String message) {
try {
OrderEvent order = JsonUtils.fromJson(message, OrderEvent.class);
log.info("Processing order: {}", order.getOrderId());
// Business logic for order processing
} catch (Exception e) {
log.error("Failed to process order message", e);
}
}
private void processNotificationMessage(String message) {
try {
NotificationEvent notification = JsonUtils.fromJson(message, NotificationEvent.class);
log.info("Sending notification to: {}", notification.getRecipient());
// Notification delivery logic
} catch (Exception e) {
log.error("Failed to process notification message", e);
}
}
public List<String> getMessageHistory() {
return new ArrayList<>(messageList);
}
}
// Event Classes
public class OrderEvent {
private String orderId;
private String customerId;
private BigDecimal amount;
private LocalDateTime timestamp;
private OrderStatus status;
// constructors, getters, setters
public OrderEvent() {}
public OrderEvent(String orderId, String customerId, BigDecimal amount, OrderStatus status) {
this.orderId = orderId;
this.customerId = customerId;
this.amount = amount;
this.status = status;
this.timestamp = LocalDateTime.now();
}
}
public class NotificationEvent {
private String notificationId;
private String recipient;
private String message;
private NotificationType type;
private LocalDateTime timestamp;
// constructors, getters, setters
}
enum OrderStatus {
CREATED, PROCESSING, COMPLETED, CANCELLED
}
enum NotificationType {
EMAIL, SMS, PUSH, IN_APP
}
// Service Layer
@Service
@Slf4j
public class OrderService {
private final RedisMessagePublisher messagePublisher;
public OrderService(RedisMessagePublisher messagePublisher) {
this.messagePublisher = messagePublisher;
}
@Transactional
public Order createOrder(CreateOrderRequest request) {
// Create order in database
Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setCustomerId(request.getCustomerId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
order.setCreatedAt(LocalDateTime.now());
// Save to database (pseudo-code)
// orderRepository.save(order);
// Publish order created event
OrderEvent event = new OrderEvent(
order.getId(),
order.getCustomerId(),
order.getAmount(),
order.getStatus()
);
messagePublisher.publishOrder(event);
log.info("Published order created event: {}", order.getId());
return order;
}
public void cancelOrder(String orderId) {
// Update order status
// Order order = orderRepository.findById(orderId).orElseThrow();
// order.setStatus(OrderStatus.CANCELLED);
// orderRepository.save(order);
// Publish order cancelled event
OrderEvent event = new OrderEvent(
orderId,
"customer-id", // Would come from order
BigDecimal.ZERO,
OrderStatus.CANCELLED
);
messagePublisher.publishOrder(event);
log.info("Published order cancelled event: {}", orderId);
}
}
// Controller
@RestController
@RequestMapping("/api/orders")
@Slf4j
public class OrderController {
private final OrderService orderService;
public OrderController(OrderService orderService) {
this.orderService = orderService;
}
@PostMapping
public ResponseEntity<Order> createOrder(@RequestBody CreateOrderRequest request) {
try {
Order order = orderService.createOrder(request);
return ResponseEntity.ok(order);
} catch (Exception e) {
log.error("Failed to create order", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
@PostMapping("/{orderId}/cancel")
public ResponseEntity<Void> cancelOrder(@PathVariable String orderId) {
try {
orderService.cancelOrder(orderId);
return ResponseEntity.ok().build();
} catch (Exception e) {
log.error("Failed to cancel order", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

3. Custom In-Memory Pub/Sub

Example 4: Custom Pub/Sub Implementation

// Custom Pub/Sub System
@Component
@Slf4j
public class InMemoryPubSub {
private final Map<String, List<Subscriber>> subscribers = new ConcurrentHashMap<>();
private final ExecutorService executorService;
private final boolean asyncProcessing;
public InMemoryPubSub() {
this(Executors.newCachedThreadPool(), true);
}
public InMemoryPubSub(ExecutorService executorService, boolean asyncProcessing) {
this.executorService = executorService;
this.asyncProcessing = asyncProcessing;
}
public interface Subscriber<T> {
void onMessage(String topic, T message);
Class<T> getMessageType();
default String getSubscriberId() {
return this.getClass().getSimpleName();
}
}
public <T> void subscribe(String topic, Subscriber<T> subscriber) {
subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>())
.add(subscriber);
log.info("Subscriber {} subscribed to topic: {}", subscriber.getSubscriberId(), topic);
}
public <T> void unsubscribe(String topic, Subscriber<T> subscriber) {
List<Subscriber> topicSubscribers = subscribers.get(topic);
if (topicSubscribers != null) {
topicSubscribers.remove(subscriber);
log.info("Subscriber {} unsubscribed from topic: {}", subscriber.getSubscriberId(), topic);
}
}
public <T> void publish(String topic, T message) {
List<Subscriber> topicSubscribers = subscribers.get(topic);
if (topicSubscribers == null || topicSubscribers.isEmpty()) {
log.debug("No subscribers for topic: {}", topic);
return;
}
log.debug("Publishing message to {} subscribers on topic: {}", 
topicSubscribers.size(), topic);
for (Subscriber subscriber : topicSubscribers) {
if (asyncProcessing) {
executorService.submit(() -> deliverMessage(subscriber, topic, message));
} else {
deliverMessage(subscriber, topic, message);
}
}
}
@SuppressWarnings("unchecked")
private <T> void deliverMessage(Subscriber subscriber, String topic, T message) {
try {
if (subscriber.getMessageType().isInstance(message)) {
subscriber.onMessage(topic, message);
} else {
log.warn("Message type mismatch for subscriber {}: expected {}, got {}", 
subscriber.getSubscriberId(), 
subscriber.getMessageType(), 
message.getClass());
}
} catch (Exception e) {
log.error("Error delivering message to subscriber: {}", 
subscriber.getSubscriberId(), e);
}
}
public Set<String> getTopics() {
return new HashSet<>(subscribers.keySet());
}
public int getSubscriberCount(String topic) {
List<Subscriber> topicSubscribers = subscribers.get(topic);
return topicSubscribers != null ? topicSubscribers.size() : 0;
}
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// Example Subscribers
@Component
@Slf4j
public class OrderProcessingSubscriber implements InMemoryPubSub.Subscriber<OrderEvent> {
@Override
public void onMessage(String topic, OrderEvent message) {
log.info("Processing order: {} with status: {}", 
message.getOrderId(), message.getStatus());
switch (message.getStatus()) {
case CREATED:
processNewOrder(message);
break;
case CANCELLED:
processCancelledOrder(message);
break;
case COMPLETED:
processCompletedOrder(message);
break;
}
}
@Override
public Class<OrderEvent> getMessageType() {
return OrderEvent.class;
}
private void processNewOrder(OrderEvent order) {
// Inventory reservation, payment processing, etc.
log.info("Processing new order: {}", order.getOrderId());
// Simulate processing time
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void processCancelledOrder(OrderEvent order) {
// Release inventory, refund payment, etc.
log.info("Processing cancelled order: {}", order.getOrderId());
}
private void processCompletedOrder(OrderEvent order) {
// Trigger shipping, send confirmation, etc.
log.info("Processing completed order: {}", order.getOrderId());
}
}
@Component
@Slf4j
public class NotificationSubscriber implements InMemoryPubSub.Subscriber<NotificationEvent> {
@Override
public void onMessage(String topic, NotificationEvent message) {
log.info("Sending {} notification to {}: {}", 
message.getType(), message.getRecipient(), message.getMessage());
// Simulate notification delivery
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.info("Notification sent successfully: {}", message.getNotificationId());
}
@Override
public Class<NotificationEvent> getMessageType() {
return NotificationEvent.class;
}
}
@Component
@Slf4j
public class AnalyticsSubscriber implements InMemoryPubSub.Subscriber<Object> {
private final Map<String, Integer> eventCounts = new ConcurrentHashMap<>();
@Override
public void onMessage(String topic, Object message) {
eventCounts.merge(topic, 1, Integer::sum);
log.debug("Analytics: Received message on topic {} (total: {})", 
topic, eventCounts.get(topic));
// Collect analytics data
collectAnalytics(topic, message);
}
@Override
public Class<Object> getMessageType() {
return Object.class; // Accept all message types
}
@Override
public String getSubscriberId() {
return "AnalyticsSubscriber";
}
private void collectAnalytics(String topic, Object message) {
// Collect metrics, update dashboards, etc.
log.info("Collected analytics for topic: {}, message type: {}", 
topic, message.getClass().getSimpleName());
}
public Map<String, Integer> getEventCounts() {
return new HashMap<>(eventCounts);
}
}
// Usage Example
@Configuration
@Slf4j
public class PubSubConfiguration {
@Bean
public InMemoryPubSub inMemoryPubSub() {
return new InMemoryPubSub();
}
@Bean
@DependsOn("inMemoryPubSub")
public String setupSubscribers(InMemoryPubSub pubSub,
OrderProcessingSubscriber orderSubscriber,
NotificationSubscriber notificationSubscriber,
AnalyticsSubscriber analyticsSubscriber) {
// Subscribe to topics
pubSub.subscribe("orders", orderSubscriber);
pubSub.subscribe("notifications", notificationSubscriber);
pubSub.subscribe("orders", analyticsSubscriber);
pubSub.subscribe("notifications", analyticsSubscriber);
pubSub.subscribe("alerts", analyticsSubscriber);
log.info("Pub/Sub subscribers configured");
return "subscribers-configured";
}
}
@Service
@Slf4j
public class EventPublisherService {
private final InMemoryPubSub pubSub;
public EventPublisherService(InMemoryPubSub pubSub) {
this.pubSub = pubSub;
}
public void publishOrderEvent(OrderEvent event) {
pubSub.publish("orders", event);
log.info("Published order event: {}", event.getOrderId());
}
public void publishNotification(NotificationEvent notification) {
pubSub.publish("notifications", notification);
log.info("Published notification: {}", notification.getNotificationId());
}
public void publishAlert(String alertMessage) {
Map<String, Object> alert = Map.of(
"id", UUID.randomUUID().toString(),
"message", alertMessage,
"timestamp", System.currentTimeMillis(),
"severity", "HIGH"
);
pubSub.publish("alerts", alert);
log.info("Published alert: {}", alertMessage);
}
public void publishToTopic(String topic, Object message) {
pubSub.publish(topic, message);
log.info("Published to topic {}: {}", topic, message);
}
}

4. WebSocket Pub/Sub

Example 5: WebSocket-based Pub/Sub

// WebSocket Configuration
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new PubSubWebSocketHandler(), "/ws/pubsub")
.setAllowedOrigins("*");
}
}
// WebSocket Handler
@Component
@Slf4j
public class PubSubWebSocketHandler extends TextWebSocketHandler {
private final Set<WebSocketSession> sessions = Collections.synchronizedSet(new HashSet<>());
private final Map<String, Set<WebSocketSession>> topicSubscriptions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
log.info("WebSocket connection established: {}", session.getId());
// Send welcome message
sendMessage(session, Map.of(
"type", "connection_established",
"sessionId", session.getId(),
"timestamp", System.currentTimeMillis()
));
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
try {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> payload = objectMapper.readValue(message.getPayload(), Map.class);
String type = (String) payload.get("type");
switch (type) {
case "subscribe":
handleSubscribe(session, payload);
break;
case "unsubscribe":
handleUnsubscribe(session, payload);
break;
case "publish":
handlePublish(session, payload);
break;
case "list_topics":
handleListTopics(session);
break;
default:
sendError(session, "Unknown message type: " + type);
}
} catch (Exception e) {
log.error("Error handling WebSocket message", e);
sendError(session, "Invalid message format");
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
// Remove from all topic subscriptions
topicSubscriptions.values().forEach(subscribers -> subscribers.remove(session));
log.info("WebSocket connection closed: {} with status: {}", session.getId(), status);
}
private void handleSubscribe(WebSocketSession session, Map<String, Object> payload) {
String topic = (String) payload.get("topic");
if (topic == null || topic.trim().isEmpty()) {
sendError(session, "Topic is required for subscription");
return;
}
topicSubscriptions.computeIfAbsent(topic, k -> Collections.synchronizedSet(new HashSet<>()))
.add(session);
log.info("Session {} subscribed to topic: {}", session.getId(), topic);
sendMessage(session, Map.of(
"type", "subscribed",
"topic", topic,
"timestamp", System.currentTimeMillis()
));
}
private void handleUnsubscribe(WebSocketSession session, Map<String, Object> payload) {
String topic = (String) payload.get("topic");
if (topic == null) {
sendError(session, "Topic is required for unsubscription");
return;
}
Set<WebSocketSession> subscribers = topicSubscriptions.get(topic);
if (subscribers != null) {
subscribers.remove(session);
log.info("Session {} unsubscribed from topic: {}", session.getId(), topic);
}
sendMessage(session, Map.of(
"type", "unsubscribed",
"topic", topic,
"timestamp", System.currentTimeMillis()
));
}
private void handlePublish(WebSocketSession session, Map<String, Object> payload) {
String topic = (String) payload.get("topic");
Object message = payload.get("message");
if (topic == null || message == null) {
sendError(session, "Topic and message are required for publishing");
return;
}
log.info("Session {} publishing to topic {}: {}", session.getId(), topic, message);
// Broadcast to all subscribers
broadcastToTopic(topic, Map.of(
"type", "message",
"topic", topic,
"message", message,
"publisher", session.getId(),
"timestamp", System.currentTimeMillis()
));
sendMessage(session, Map.of(
"type", "published",
"topic", topic,
"timestamp", System.currentTimeMillis()
));
}
private void handleListTopics(WebSocketSession session) {
sendMessage(session, Map.of(
"type", "topics_list",
"topics", new ArrayList<>(topicSubscriptions.keySet()),
"timestamp", System.currentTimeMillis()
));
}
private void broadcastToTopic(String topic, Object message) {
Set<WebSocketSession> subscribers = topicSubscriptions.get(topic);
if (subscribers == null || subscribers.isEmpty()) {
return;
}
String messageJson;
try {
messageJson = new ObjectMapper().writeValueAsString(message);
} catch (JsonProcessingException e) {
log.error("Failed to serialize message", e);
return;
}
subscribers.forEach(session -> {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage(messageJson));
} catch (IOException e) {
log.error("Failed to send message to session: {}", session.getId(), e);
}
}
});
}
private void sendMessage(WebSocketSession session, Object message) {
try {
String messageJson = new ObjectMapper().writeValueAsString(message);
session.sendMessage(new TextMessage(messageJson));
} catch (IOException e) {
log.error("Failed to send message to session: {}", session.getId(), e);
}
}
private void sendError(WebSocketSession session, String errorMessage) {
sendMessage(session, Map.of(
"type", "error",
"message", errorMessage,
"timestamp", System.currentTimeMillis()
));
}
// Method for server-side publishing
public void publishToTopic(String topic, Object message) {
broadcastToTopic(topic, Map.of(
"type", "message",
"topic", topic,
"message", message,
"publisher", "server",
"timestamp", System.currentTimeMillis()
));
}
public Set<String> getActiveTopics() {
return new HashSet<>(topicSubscriptions.keySet());
}
public int getSubscriberCount(String topic) {
Set<WebSocketSession> subscribers = topicSubscriptions.get(topic);
return subscribers != null ? subscribers.size() : 0;
}
}
// REST Controller for Server-Side Publishing
@RestController
@RequestMapping("/api/pubsub")
@Slf4j
public class PubSubController {
private final PubSubWebSocketHandler webSocketHandler;
public PubSubController(PubSubWebSocketHandler webSocketHandler) {
this.webSocketHandler = webSocketHandler;
}
@PostMapping("/publish/{topic}")
public ResponseEntity<Map<String, Object>> publishToTopic(
@PathVariable String topic,
@RequestBody Map<String, Object> message) {
try {
webSocketHandler.publishToTopic(topic, message);
return ResponseEntity.ok(Map.of(
"status", "success",
"topic", topic,
"message", "Message published successfully"
));
} catch (Exception e) {
log.error("Failed to publish message to topic: {}", topic, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(Map.of("error", "Failed to publish message"));
}
}
@GetMapping("/topics")
public ResponseEntity<Map<String, Object>> getActiveTopics() {
return ResponseEntity.ok(Map.of(
"topics", webSocketHandler.getActiveTopics(),
"timestamp", System.currentTimeMillis()
));
}
@GetMapping("/topics/{topic}/subscribers")
public ResponseEntity<Map<String, Object>> getSubscriberCount(@PathVariable String topic) {
return ResponseEntity.ok(Map.of(
"topic", topic,
"subscriberCount", webSocketHandler.getSubscriberCount(topic),
"timestamp", System.currentTimeMillis()
));
}
}

Best Practices and Patterns

1. Message Serialization

@Component
@Slf4j
public class MessageSerializer {
private final ObjectMapper objectMapper;
public MessageSerializer() {
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule());
this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
public String serialize(Object object) {
try {
return objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
log.error("Failed to serialize object: {}", object, e);
throw new SerializationException("Failed to serialize object", e);
}
}
public <T> T deserialize(String json, Class<T> clazz) {
try {
return objectMapper.readValue(json, clazz);
} catch (JsonProcessingException e) {
log.error("Failed to deserialize JSON: {}", json, e);
throw new SerializationException("Failed to deserialize JSON", e);
}
}
public <T> T deserialize(String json, TypeReference<T> typeReference) {
try {
return objectMapper.readValue(json, typeReference);
} catch (JsonProcessingException e) {
log.error("Failed to deserialize JSON: {}", json, e);
throw new SerializationException("Failed to deserialize JSON", e);
}
}
}
class SerializationException extends RuntimeException {
public SerializationException(String message, Throwable cause) {
super(message, cause);
}
}

2. Error Handling and Retry

@Component
@Slf4j
public class ResilientMessageHandler {
private final InMemoryPubSub pubSub;
private final ExecutorService retryExecutor;
public ResilientMessageHandler(InMemoryPubSub pubSub) {
this.pubSub = pubSub;
this.retryExecutor = Executors.newFixedThreadPool(5);
}
public static class RetryableSubscriber<T> implements InMemoryPubSub.Subscriber<T> {
private final InMemoryPubSub.Subscriber<T> delegate;
private final int maxRetries;
private final long retryDelayMs;
public RetryableSubscriber(InMemoryPubSub.Subscriber<T> delegate, int maxRetries, long retryDelayMs) {
this.delegate = delegate;
this.maxRetries = maxRetries;
this.retryDelayMs = retryDelayMs;
}
@Override
public void onMessage(String topic, T message) {
executeWithRetry(topic, message, 0);
}
private void executeWithRetry(String topic, T message, int attempt) {
try {
delegate.onMessage(topic, message);
} catch (Exception e) {
if (attempt < maxRetries) {
log.warn("Message processing failed, retrying... (attempt {}/{})", 
attempt + 1, maxRetries);
scheduleRetry(topic, message, attempt + 1);
} else {
log.error("Message processing failed after {} attempts", maxRetries, e);
handleFinalFailure(topic, message, e);
}
}
}
private void scheduleRetry(String topic, T message, int nextAttempt) {
CompletableFuture.delayedExecutor(retryDelayMs, TimeUnit.MILLISECONDS)
.execute(() -> executeWithRetry(topic, message, nextAttempt));
}
private void handleFinalFailure(String topic, T message, Exception error) {
// Send to dead letter queue, alert, etc.
log.error("Final failure for message on topic {}: {}", topic, message, error);
}
@Override
public Class<T> getMessageType() {
return delegate.getMessageType();
}
@Override
public String getSubscriberId() {
return "Retryable-" + delegate.getSubscriberId();
}
}
}

3. Monitoring and Metrics

@Component
@Slf4j
public class PubSubMetrics {
private final MeterRegistry meterRegistry;
private final Counter messagesPublished;
private final Counter messagesReceived;
private final Counter processingErrors;
private final Map<String, Gauge> topicSubscriberGauges = new ConcurrentHashMap<>();
public PubSubMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.messagesPublished = Counter.builder("pubsub.messages.published")
.description("Total number of messages published")
.register(meterRegistry);
this.messagesReceived = Counter.builder("pubsub.messages.received")
.description("Total number of messages received")
.register(meterRegistry);
this.processingErrors = Counter.builder("pubsub.processing.errors")
.description("Total number of message processing errors")
.register(meterRegistry);
}
public void recordMessagePublished(String topic) {
messagesPublished.increment();
Counter.builder("pubsub.messages.published")
.tag("topic", topic)
.register(meterRegistry)
.increment();
}
public void recordMessageReceived(String topic) {
messagesReceived.increment();
Counter.builder("pubsub.messages.received")
.tag("topic", topic)
.register(meterRegistry)
.increment();
}
public void recordProcessingError(String topic, String subscriberId) {
processingErrors.increment();
Counter.builder("pubsub.processing.errors")
.tag("topic", topic)
.tag("subscriber", subscriberId)
.register(meterRegistry)
.increment();
}
public void registerTopicSubscriberGauge(String topic, Supplier<Number> subscriberCountSupplier) {
Gauge gauge = Gauge.builder("pubsub.topic.subscribers")
.tag("topic", topic)
.description("Number of subscribers for topic")
.register(meterRegistry, subscriberCountSupplier);
topicSubscriberGauges.put(topic, gauge);
}
}

Testing Pub/Sub Systems

Example 6: Testing Pub/Sub

@ExtendWith(MockitoExtension.class)
class PubSubTest {
@Test
void testInMemoryPubSub() {
InMemoryPubSub pubSub = new InMemoryPubSub();
// Test subscriber
List<String> receivedMessages = new ArrayList<>();
InMemoryPubSub.Subscriber<String> testSubscriber = new InMemoryPubSub.Subscriber<String>() {
@Override
public void onMessage(String topic, String message) {
receivedMessages.add(message);
}
@Override
public Class<String> getMessageType() {
return String.class;
}
};
// Subscribe and publish
pubSub.subscribe("test-topic", testSubscriber);
pubSub.publish("test-topic", "Hello World");
pubSub.publish("test-topic", "Test Message");
// Verify
assertThat(receivedMessages).containsExactly("Hello World", "Test Message");
}
@SpringBootTest
@TestPropertySource(properties = {
"spring.redis.port=6379"
})
static class RedisPubSubIntegrationTest {
@Autowired
private RedisMessagePublisher publisher;
@Autowired
private RedisMessageSubscriber subscriber;
@Test
void testRedisPubSub() throws InterruptedException {
// Given
OrderEvent order = new OrderEvent("order-123", "customer-456", 
new BigDecimal("99.99"), OrderStatus.CREATED);
// When
publisher.publishOrder(order);
// Then - wait for message to be processed
Thread.sleep(1000);
List<String> messageHistory = subscriber.getMessageHistory();
assertThat(messageHistory).isNotEmpty();
}
}
}

Conclusion

Pub/Sub messaging in Java offers several implementation options:

Redis Pub/Sub:

  • ✅ Fast and reliable
  • ✅ Persistent storage
  • ✅ Cluster support
  • ❌ Requires Redis infrastructure

Spring Framework:

  • ✅ Tight Spring integration
  • ✅ Rich feature set
  • ✅ Production-ready
  • ❌ Spring ecosystem dependency

Custom In-Memory:

  • ✅ No external dependencies
  • ✅ Full control
  • ✅ Lightweight
  • ❌ Not persistent
  • ❌ Single JVM only

WebSocket:

  • ✅ Real-time browser support
  • ✅ HTTP compatible
  • ✅ Cross-platform
  • ❌ Connection management overhead

Best Practices:

  1. Use appropriate serialization (JSON, Avro, Protobuf)
  2. Implement proper error handling and retry mechanisms
  3. Monitor message rates and system health
  4. Consider message ordering guarantees
  5. Plan for scalability and fault tolerance

Choose the implementation that best fits your requirements for performance, scalability, and operational complexity.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper