Pub/Sub (Publish-Subscribe) messaging is a powerful pattern that enables loose coupling between components in distributed systems. Publishers send messages without knowing about subscribers, and subscribers receive messages without knowing about publishers.
Core Pub/Sub Concepts
Key Components
- Publisher: Sends messages to topics/channels
- Subscriber: Receives messages from topics/channels
- Topic/Channel: Logical channel for message distribution
- Message Broker: Middleware that handles message routing
Implementation Approaches
1. Redis Pub/Sub
Dependencies
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>4.4.0</version> </dependency>
Example 1: Basic Redis Pub/Sub
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class RedisPubSubExample {
public static class MessageHandler extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.println("Received message: " + message + " from channel: " + channel);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("Subscribed to channel: " + channel);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println("Unsubscribed from channel: " + channel);
}
}
public static void main(String[] args) {
// Publisher
new Thread(() -> {
try (Jedis publisher = new Jedis("localhost", 6379)) {
for (int i = 1; i <= 5; i++) {
String message = "Message " + i;
publisher.publish("news", message);
System.out.println("Published: " + message);
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// Subscriber
new Thread(() -> {
try (Jedis subscriber = new Jedis("localhost", 6379)) {
MessageHandler handler = new MessageHandler();
subscriber.subscribe(handler, "news");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
Example 2: Advanced Redis Pub/Sub with Connection Pool
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import java.time.Duration;
public class RedisPubSubAdvanced {
private final JedisPool jedisPool;
public RedisPubSubAdvanced() {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(10);
poolConfig.setMaxIdle(5);
poolConfig.setMinIdle(1);
poolConfig.setMaxWait(Duration.ofSeconds(30));
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
this.jedisPool = new JedisPool(poolConfig, "localhost", 6379);
}
public static class AdvancedMessageHandler extends JedisPubSub {
private final String subscriberId;
public AdvancedMessageHandler(String subscriberId) {
this.subscriberId = subscriberId;
}
@Override
public void onMessage(String channel, String message) {
System.out.printf("[%s] Received on channel '%s': %s%n",
subscriberId, channel, message);
// Process message based on channel
switch (channel) {
case "orders":
processOrderMessage(message);
break;
case "notifications":
processNotificationMessage(message);
break;
case "alerts":
processAlertMessage(message);
break;
default:
System.out.println("Unknown channel: " + channel);
}
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.printf("[%s] Subscribed to channel: %s (total: %d)%n",
subscriberId, channel, subscribedChannels);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.printf("[%s] Unsubscribed from channel: %s (remaining: %d)%n",
subscriberId, channel, subscribedChannels);
}
@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.printf("[%s] Pattern match: pattern=%s, channel=%s, message=%s%n",
subscriberId, pattern, channel, message);
}
private void processOrderMessage(String message) {
System.out.println("Processing order: " + message);
// Order processing logic
}
private void processNotificationMessage(String message) {
System.out.println("Sending notification: " + message);
// Notification logic
}
private void processAlertMessage(String message) {
System.out.println("Handling alert: " + message);
// Alert handling logic
}
}
public void publish(String channel, String message) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.publish(channel, message);
System.out.println("Published to " + channel + ": " + message);
}
}
public void subscribe(String... channels) {
new Thread(() -> {
try (Jedis jedis = jedisPool.getResource()) {
AdvancedMessageHandler handler = new AdvancedMessageHandler("Subscriber-1");
jedis.subscribe(handler, channels);
} catch (Exception e) {
System.err.println("Subscription error: " + e.getMessage());
}
}).start();
}
public void patternSubscribe(String... patterns) {
new Thread(() -> {
try (Jedis jedis = jedisPool.getResource()) {
AdvancedMessageHandler handler = new AdvancedMessageHandler("Pattern-Subscriber");
jedis.psubscribe(handler, patterns);
} catch (Exception e) {
System.err.println("Pattern subscription error: " + e.getMessage());
}
}).start();
}
public static void main(String[] args) throws InterruptedException {
RedisPubSubAdvanced pubSub = new RedisPubSubAdvanced();
// Start subscribers
pubSub.subscribe("orders", "notifications");
pubSub.patternSubscribe("alert:*", "log:*");
Thread.sleep(1000);
// Publish messages
pubSub.publish("orders", "New order: ORDER-123");
pubSub.publish("notifications", "User registered: USER-456");
pubSub.publish("alerts:high", "High priority alert!");
pubSub.publish("alerts:medium", "Medium priority alert");
pubSub.publish("logs:app", "Application log entry");
Thread.sleep(5000);
pubSub.jedisPool.close();
}
}
2. Spring Framework Pub/Sub
Dependencies
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
Example 3: Spring Redis Pub/Sub
// Configuration
@Configuration
@EnableRedisRepositories
public class RedisConfig {
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory(new RedisStandaloneConfiguration("localhost", 6379));
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory());
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
@Bean
public ChannelTopic orderTopic() {
return new ChannelTopic("orders");
}
@Bean
public ChannelTopic notificationTopic() {
return new ChannelTopic("notifications");
}
@Bean
public RedisMessageListenerContainer redisContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory());
return container;
}
}
// Message Publisher
@Component
public class RedisMessagePublisher {
private final RedisTemplate<String, Object> redisTemplate;
private final ChannelTopic orderTopic;
private final ChannelTopic notificationTopic;
public RedisMessagePublisher(RedisTemplate<String, Object> redisTemplate,
ChannelTopic orderTopic,
ChannelTopic notificationTopic) {
this.redisTemplate = redisTemplate;
this.orderTopic = orderTopic;
this.notificationTopic = notificationTopic;
}
public void publishOrder(OrderEvent order) {
redisTemplate.convertAndSend(orderTopic.getTopic(), order);
}
public void publishNotification(NotificationEvent notification) {
redisTemplate.convertAndSend(notificationTopic.getTopic(), notification);
}
public void publishToTopic(String topic, Object message) {
redisTemplate.convertAndSend(topic, message);
}
}
// Message Subscriber
@Component
@Slf4j
public class RedisMessageSubscriber implements MessageListener {
private final List<String> messageList = new ArrayList<>();
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel());
String body = new String(message.getBody());
log.info("Received message from channel {}: {}", channel, body);
messageList.add(body);
// Process based on channel
processMessage(channel, body);
}
private void processMessage(String channel, String message) {
switch (channel) {
case "orders":
processOrderMessage(message);
break;
case "notifications":
processNotificationMessage(message);
break;
default:
log.warn("Unknown channel: {}", channel);
}
}
private void processOrderMessage(String message) {
try {
OrderEvent order = JsonUtils.fromJson(message, OrderEvent.class);
log.info("Processing order: {}", order.getOrderId());
// Business logic for order processing
} catch (Exception e) {
log.error("Failed to process order message", e);
}
}
private void processNotificationMessage(String message) {
try {
NotificationEvent notification = JsonUtils.fromJson(message, NotificationEvent.class);
log.info("Sending notification to: {}", notification.getRecipient());
// Notification delivery logic
} catch (Exception e) {
log.error("Failed to process notification message", e);
}
}
public List<String> getMessageHistory() {
return new ArrayList<>(messageList);
}
}
// Event Classes
public class OrderEvent {
private String orderId;
private String customerId;
private BigDecimal amount;
private LocalDateTime timestamp;
private OrderStatus status;
// constructors, getters, setters
public OrderEvent() {}
public OrderEvent(String orderId, String customerId, BigDecimal amount, OrderStatus status) {
this.orderId = orderId;
this.customerId = customerId;
this.amount = amount;
this.status = status;
this.timestamp = LocalDateTime.now();
}
}
public class NotificationEvent {
private String notificationId;
private String recipient;
private String message;
private NotificationType type;
private LocalDateTime timestamp;
// constructors, getters, setters
}
enum OrderStatus {
CREATED, PROCESSING, COMPLETED, CANCELLED
}
enum NotificationType {
EMAIL, SMS, PUSH, IN_APP
}
// Service Layer
@Service
@Slf4j
public class OrderService {
private final RedisMessagePublisher messagePublisher;
public OrderService(RedisMessagePublisher messagePublisher) {
this.messagePublisher = messagePublisher;
}
@Transactional
public Order createOrder(CreateOrderRequest request) {
// Create order in database
Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setCustomerId(request.getCustomerId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
order.setCreatedAt(LocalDateTime.now());
// Save to database (pseudo-code)
// orderRepository.save(order);
// Publish order created event
OrderEvent event = new OrderEvent(
order.getId(),
order.getCustomerId(),
order.getAmount(),
order.getStatus()
);
messagePublisher.publishOrder(event);
log.info("Published order created event: {}", order.getId());
return order;
}
public void cancelOrder(String orderId) {
// Update order status
// Order order = orderRepository.findById(orderId).orElseThrow();
// order.setStatus(OrderStatus.CANCELLED);
// orderRepository.save(order);
// Publish order cancelled event
OrderEvent event = new OrderEvent(
orderId,
"customer-id", // Would come from order
BigDecimal.ZERO,
OrderStatus.CANCELLED
);
messagePublisher.publishOrder(event);
log.info("Published order cancelled event: {}", orderId);
}
}
// Controller
@RestController
@RequestMapping("/api/orders")
@Slf4j
public class OrderController {
private final OrderService orderService;
public OrderController(OrderService orderService) {
this.orderService = orderService;
}
@PostMapping
public ResponseEntity<Order> createOrder(@RequestBody CreateOrderRequest request) {
try {
Order order = orderService.createOrder(request);
return ResponseEntity.ok(order);
} catch (Exception e) {
log.error("Failed to create order", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
@PostMapping("/{orderId}/cancel")
public ResponseEntity<Void> cancelOrder(@PathVariable String orderId) {
try {
orderService.cancelOrder(orderId);
return ResponseEntity.ok().build();
} catch (Exception e) {
log.error("Failed to cancel order", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}
3. Custom In-Memory Pub/Sub
Example 4: Custom Pub/Sub Implementation
// Custom Pub/Sub System
@Component
@Slf4j
public class InMemoryPubSub {
private final Map<String, List<Subscriber>> subscribers = new ConcurrentHashMap<>();
private final ExecutorService executorService;
private final boolean asyncProcessing;
public InMemoryPubSub() {
this(Executors.newCachedThreadPool(), true);
}
public InMemoryPubSub(ExecutorService executorService, boolean asyncProcessing) {
this.executorService = executorService;
this.asyncProcessing = asyncProcessing;
}
public interface Subscriber<T> {
void onMessage(String topic, T message);
Class<T> getMessageType();
default String getSubscriberId() {
return this.getClass().getSimpleName();
}
}
public <T> void subscribe(String topic, Subscriber<T> subscriber) {
subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>())
.add(subscriber);
log.info("Subscriber {} subscribed to topic: {}", subscriber.getSubscriberId(), topic);
}
public <T> void unsubscribe(String topic, Subscriber<T> subscriber) {
List<Subscriber> topicSubscribers = subscribers.get(topic);
if (topicSubscribers != null) {
topicSubscribers.remove(subscriber);
log.info("Subscriber {} unsubscribed from topic: {}", subscriber.getSubscriberId(), topic);
}
}
public <T> void publish(String topic, T message) {
List<Subscriber> topicSubscribers = subscribers.get(topic);
if (topicSubscribers == null || topicSubscribers.isEmpty()) {
log.debug("No subscribers for topic: {}", topic);
return;
}
log.debug("Publishing message to {} subscribers on topic: {}",
topicSubscribers.size(), topic);
for (Subscriber subscriber : topicSubscribers) {
if (asyncProcessing) {
executorService.submit(() -> deliverMessage(subscriber, topic, message));
} else {
deliverMessage(subscriber, topic, message);
}
}
}
@SuppressWarnings("unchecked")
private <T> void deliverMessage(Subscriber subscriber, String topic, T message) {
try {
if (subscriber.getMessageType().isInstance(message)) {
subscriber.onMessage(topic, message);
} else {
log.warn("Message type mismatch for subscriber {}: expected {}, got {}",
subscriber.getSubscriberId(),
subscriber.getMessageType(),
message.getClass());
}
} catch (Exception e) {
log.error("Error delivering message to subscriber: {}",
subscriber.getSubscriberId(), e);
}
}
public Set<String> getTopics() {
return new HashSet<>(subscribers.keySet());
}
public int getSubscriberCount(String topic) {
List<Subscriber> topicSubscribers = subscribers.get(topic);
return topicSubscribers != null ? topicSubscribers.size() : 0;
}
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// Example Subscribers
@Component
@Slf4j
public class OrderProcessingSubscriber implements InMemoryPubSub.Subscriber<OrderEvent> {
@Override
public void onMessage(String topic, OrderEvent message) {
log.info("Processing order: {} with status: {}",
message.getOrderId(), message.getStatus());
switch (message.getStatus()) {
case CREATED:
processNewOrder(message);
break;
case CANCELLED:
processCancelledOrder(message);
break;
case COMPLETED:
processCompletedOrder(message);
break;
}
}
@Override
public Class<OrderEvent> getMessageType() {
return OrderEvent.class;
}
private void processNewOrder(OrderEvent order) {
// Inventory reservation, payment processing, etc.
log.info("Processing new order: {}", order.getOrderId());
// Simulate processing time
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void processCancelledOrder(OrderEvent order) {
// Release inventory, refund payment, etc.
log.info("Processing cancelled order: {}", order.getOrderId());
}
private void processCompletedOrder(OrderEvent order) {
// Trigger shipping, send confirmation, etc.
log.info("Processing completed order: {}", order.getOrderId());
}
}
@Component
@Slf4j
public class NotificationSubscriber implements InMemoryPubSub.Subscriber<NotificationEvent> {
@Override
public void onMessage(String topic, NotificationEvent message) {
log.info("Sending {} notification to {}: {}",
message.getType(), message.getRecipient(), message.getMessage());
// Simulate notification delivery
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.info("Notification sent successfully: {}", message.getNotificationId());
}
@Override
public Class<NotificationEvent> getMessageType() {
return NotificationEvent.class;
}
}
@Component
@Slf4j
public class AnalyticsSubscriber implements InMemoryPubSub.Subscriber<Object> {
private final Map<String, Integer> eventCounts = new ConcurrentHashMap<>();
@Override
public void onMessage(String topic, Object message) {
eventCounts.merge(topic, 1, Integer::sum);
log.debug("Analytics: Received message on topic {} (total: {})",
topic, eventCounts.get(topic));
// Collect analytics data
collectAnalytics(topic, message);
}
@Override
public Class<Object> getMessageType() {
return Object.class; // Accept all message types
}
@Override
public String getSubscriberId() {
return "AnalyticsSubscriber";
}
private void collectAnalytics(String topic, Object message) {
// Collect metrics, update dashboards, etc.
log.info("Collected analytics for topic: {}, message type: {}",
topic, message.getClass().getSimpleName());
}
public Map<String, Integer> getEventCounts() {
return new HashMap<>(eventCounts);
}
}
// Usage Example
@Configuration
@Slf4j
public class PubSubConfiguration {
@Bean
public InMemoryPubSub inMemoryPubSub() {
return new InMemoryPubSub();
}
@Bean
@DependsOn("inMemoryPubSub")
public String setupSubscribers(InMemoryPubSub pubSub,
OrderProcessingSubscriber orderSubscriber,
NotificationSubscriber notificationSubscriber,
AnalyticsSubscriber analyticsSubscriber) {
// Subscribe to topics
pubSub.subscribe("orders", orderSubscriber);
pubSub.subscribe("notifications", notificationSubscriber);
pubSub.subscribe("orders", analyticsSubscriber);
pubSub.subscribe("notifications", analyticsSubscriber);
pubSub.subscribe("alerts", analyticsSubscriber);
log.info("Pub/Sub subscribers configured");
return "subscribers-configured";
}
}
@Service
@Slf4j
public class EventPublisherService {
private final InMemoryPubSub pubSub;
public EventPublisherService(InMemoryPubSub pubSub) {
this.pubSub = pubSub;
}
public void publishOrderEvent(OrderEvent event) {
pubSub.publish("orders", event);
log.info("Published order event: {}", event.getOrderId());
}
public void publishNotification(NotificationEvent notification) {
pubSub.publish("notifications", notification);
log.info("Published notification: {}", notification.getNotificationId());
}
public void publishAlert(String alertMessage) {
Map<String, Object> alert = Map.of(
"id", UUID.randomUUID().toString(),
"message", alertMessage,
"timestamp", System.currentTimeMillis(),
"severity", "HIGH"
);
pubSub.publish("alerts", alert);
log.info("Published alert: {}", alertMessage);
}
public void publishToTopic(String topic, Object message) {
pubSub.publish(topic, message);
log.info("Published to topic {}: {}", topic, message);
}
}
4. WebSocket Pub/Sub
Example 5: WebSocket-based Pub/Sub
// WebSocket Configuration
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new PubSubWebSocketHandler(), "/ws/pubsub")
.setAllowedOrigins("*");
}
}
// WebSocket Handler
@Component
@Slf4j
public class PubSubWebSocketHandler extends TextWebSocketHandler {
private final Set<WebSocketSession> sessions = Collections.synchronizedSet(new HashSet<>());
private final Map<String, Set<WebSocketSession>> topicSubscriptions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
log.info("WebSocket connection established: {}", session.getId());
// Send welcome message
sendMessage(session, Map.of(
"type", "connection_established",
"sessionId", session.getId(),
"timestamp", System.currentTimeMillis()
));
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
try {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> payload = objectMapper.readValue(message.getPayload(), Map.class);
String type = (String) payload.get("type");
switch (type) {
case "subscribe":
handleSubscribe(session, payload);
break;
case "unsubscribe":
handleUnsubscribe(session, payload);
break;
case "publish":
handlePublish(session, payload);
break;
case "list_topics":
handleListTopics(session);
break;
default:
sendError(session, "Unknown message type: " + type);
}
} catch (Exception e) {
log.error("Error handling WebSocket message", e);
sendError(session, "Invalid message format");
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
// Remove from all topic subscriptions
topicSubscriptions.values().forEach(subscribers -> subscribers.remove(session));
log.info("WebSocket connection closed: {} with status: {}", session.getId(), status);
}
private void handleSubscribe(WebSocketSession session, Map<String, Object> payload) {
String topic = (String) payload.get("topic");
if (topic == null || topic.trim().isEmpty()) {
sendError(session, "Topic is required for subscription");
return;
}
topicSubscriptions.computeIfAbsent(topic, k -> Collections.synchronizedSet(new HashSet<>()))
.add(session);
log.info("Session {} subscribed to topic: {}", session.getId(), topic);
sendMessage(session, Map.of(
"type", "subscribed",
"topic", topic,
"timestamp", System.currentTimeMillis()
));
}
private void handleUnsubscribe(WebSocketSession session, Map<String, Object> payload) {
String topic = (String) payload.get("topic");
if (topic == null) {
sendError(session, "Topic is required for unsubscription");
return;
}
Set<WebSocketSession> subscribers = topicSubscriptions.get(topic);
if (subscribers != null) {
subscribers.remove(session);
log.info("Session {} unsubscribed from topic: {}", session.getId(), topic);
}
sendMessage(session, Map.of(
"type", "unsubscribed",
"topic", topic,
"timestamp", System.currentTimeMillis()
));
}
private void handlePublish(WebSocketSession session, Map<String, Object> payload) {
String topic = (String) payload.get("topic");
Object message = payload.get("message");
if (topic == null || message == null) {
sendError(session, "Topic and message are required for publishing");
return;
}
log.info("Session {} publishing to topic {}: {}", session.getId(), topic, message);
// Broadcast to all subscribers
broadcastToTopic(topic, Map.of(
"type", "message",
"topic", topic,
"message", message,
"publisher", session.getId(),
"timestamp", System.currentTimeMillis()
));
sendMessage(session, Map.of(
"type", "published",
"topic", topic,
"timestamp", System.currentTimeMillis()
));
}
private void handleListTopics(WebSocketSession session) {
sendMessage(session, Map.of(
"type", "topics_list",
"topics", new ArrayList<>(topicSubscriptions.keySet()),
"timestamp", System.currentTimeMillis()
));
}
private void broadcastToTopic(String topic, Object message) {
Set<WebSocketSession> subscribers = topicSubscriptions.get(topic);
if (subscribers == null || subscribers.isEmpty()) {
return;
}
String messageJson;
try {
messageJson = new ObjectMapper().writeValueAsString(message);
} catch (JsonProcessingException e) {
log.error("Failed to serialize message", e);
return;
}
subscribers.forEach(session -> {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage(messageJson));
} catch (IOException e) {
log.error("Failed to send message to session: {}", session.getId(), e);
}
}
});
}
private void sendMessage(WebSocketSession session, Object message) {
try {
String messageJson = new ObjectMapper().writeValueAsString(message);
session.sendMessage(new TextMessage(messageJson));
} catch (IOException e) {
log.error("Failed to send message to session: {}", session.getId(), e);
}
}
private void sendError(WebSocketSession session, String errorMessage) {
sendMessage(session, Map.of(
"type", "error",
"message", errorMessage,
"timestamp", System.currentTimeMillis()
));
}
// Method for server-side publishing
public void publishToTopic(String topic, Object message) {
broadcastToTopic(topic, Map.of(
"type", "message",
"topic", topic,
"message", message,
"publisher", "server",
"timestamp", System.currentTimeMillis()
));
}
public Set<String> getActiveTopics() {
return new HashSet<>(topicSubscriptions.keySet());
}
public int getSubscriberCount(String topic) {
Set<WebSocketSession> subscribers = topicSubscriptions.get(topic);
return subscribers != null ? subscribers.size() : 0;
}
}
// REST Controller for Server-Side Publishing
@RestController
@RequestMapping("/api/pubsub")
@Slf4j
public class PubSubController {
private final PubSubWebSocketHandler webSocketHandler;
public PubSubController(PubSubWebSocketHandler webSocketHandler) {
this.webSocketHandler = webSocketHandler;
}
@PostMapping("/publish/{topic}")
public ResponseEntity<Map<String, Object>> publishToTopic(
@PathVariable String topic,
@RequestBody Map<String, Object> message) {
try {
webSocketHandler.publishToTopic(topic, message);
return ResponseEntity.ok(Map.of(
"status", "success",
"topic", topic,
"message", "Message published successfully"
));
} catch (Exception e) {
log.error("Failed to publish message to topic: {}", topic, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(Map.of("error", "Failed to publish message"));
}
}
@GetMapping("/topics")
public ResponseEntity<Map<String, Object>> getActiveTopics() {
return ResponseEntity.ok(Map.of(
"topics", webSocketHandler.getActiveTopics(),
"timestamp", System.currentTimeMillis()
));
}
@GetMapping("/topics/{topic}/subscribers")
public ResponseEntity<Map<String, Object>> getSubscriberCount(@PathVariable String topic) {
return ResponseEntity.ok(Map.of(
"topic", topic,
"subscriberCount", webSocketHandler.getSubscriberCount(topic),
"timestamp", System.currentTimeMillis()
));
}
}
Best Practices and Patterns
1. Message Serialization
@Component
@Slf4j
public class MessageSerializer {
private final ObjectMapper objectMapper;
public MessageSerializer() {
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule());
this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
public String serialize(Object object) {
try {
return objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
log.error("Failed to serialize object: {}", object, e);
throw new SerializationException("Failed to serialize object", e);
}
}
public <T> T deserialize(String json, Class<T> clazz) {
try {
return objectMapper.readValue(json, clazz);
} catch (JsonProcessingException e) {
log.error("Failed to deserialize JSON: {}", json, e);
throw new SerializationException("Failed to deserialize JSON", e);
}
}
public <T> T deserialize(String json, TypeReference<T> typeReference) {
try {
return objectMapper.readValue(json, typeReference);
} catch (JsonProcessingException e) {
log.error("Failed to deserialize JSON: {}", json, e);
throw new SerializationException("Failed to deserialize JSON", e);
}
}
}
class SerializationException extends RuntimeException {
public SerializationException(String message, Throwable cause) {
super(message, cause);
}
}
2. Error Handling and Retry
@Component
@Slf4j
public class ResilientMessageHandler {
private final InMemoryPubSub pubSub;
private final ExecutorService retryExecutor;
public ResilientMessageHandler(InMemoryPubSub pubSub) {
this.pubSub = pubSub;
this.retryExecutor = Executors.newFixedThreadPool(5);
}
public static class RetryableSubscriber<T> implements InMemoryPubSub.Subscriber<T> {
private final InMemoryPubSub.Subscriber<T> delegate;
private final int maxRetries;
private final long retryDelayMs;
public RetryableSubscriber(InMemoryPubSub.Subscriber<T> delegate, int maxRetries, long retryDelayMs) {
this.delegate = delegate;
this.maxRetries = maxRetries;
this.retryDelayMs = retryDelayMs;
}
@Override
public void onMessage(String topic, T message) {
executeWithRetry(topic, message, 0);
}
private void executeWithRetry(String topic, T message, int attempt) {
try {
delegate.onMessage(topic, message);
} catch (Exception e) {
if (attempt < maxRetries) {
log.warn("Message processing failed, retrying... (attempt {}/{})",
attempt + 1, maxRetries);
scheduleRetry(topic, message, attempt + 1);
} else {
log.error("Message processing failed after {} attempts", maxRetries, e);
handleFinalFailure(topic, message, e);
}
}
}
private void scheduleRetry(String topic, T message, int nextAttempt) {
CompletableFuture.delayedExecutor(retryDelayMs, TimeUnit.MILLISECONDS)
.execute(() -> executeWithRetry(topic, message, nextAttempt));
}
private void handleFinalFailure(String topic, T message, Exception error) {
// Send to dead letter queue, alert, etc.
log.error("Final failure for message on topic {}: {}", topic, message, error);
}
@Override
public Class<T> getMessageType() {
return delegate.getMessageType();
}
@Override
public String getSubscriberId() {
return "Retryable-" + delegate.getSubscriberId();
}
}
}
3. Monitoring and Metrics
@Component
@Slf4j
public class PubSubMetrics {
private final MeterRegistry meterRegistry;
private final Counter messagesPublished;
private final Counter messagesReceived;
private final Counter processingErrors;
private final Map<String, Gauge> topicSubscriberGauges = new ConcurrentHashMap<>();
public PubSubMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.messagesPublished = Counter.builder("pubsub.messages.published")
.description("Total number of messages published")
.register(meterRegistry);
this.messagesReceived = Counter.builder("pubsub.messages.received")
.description("Total number of messages received")
.register(meterRegistry);
this.processingErrors = Counter.builder("pubsub.processing.errors")
.description("Total number of message processing errors")
.register(meterRegistry);
}
public void recordMessagePublished(String topic) {
messagesPublished.increment();
Counter.builder("pubsub.messages.published")
.tag("topic", topic)
.register(meterRegistry)
.increment();
}
public void recordMessageReceived(String topic) {
messagesReceived.increment();
Counter.builder("pubsub.messages.received")
.tag("topic", topic)
.register(meterRegistry)
.increment();
}
public void recordProcessingError(String topic, String subscriberId) {
processingErrors.increment();
Counter.builder("pubsub.processing.errors")
.tag("topic", topic)
.tag("subscriber", subscriberId)
.register(meterRegistry)
.increment();
}
public void registerTopicSubscriberGauge(String topic, Supplier<Number> subscriberCountSupplier) {
Gauge gauge = Gauge.builder("pubsub.topic.subscribers")
.tag("topic", topic)
.description("Number of subscribers for topic")
.register(meterRegistry, subscriberCountSupplier);
topicSubscriberGauges.put(topic, gauge);
}
}
Testing Pub/Sub Systems
Example 6: Testing Pub/Sub
@ExtendWith(MockitoExtension.class)
class PubSubTest {
@Test
void testInMemoryPubSub() {
InMemoryPubSub pubSub = new InMemoryPubSub();
// Test subscriber
List<String> receivedMessages = new ArrayList<>();
InMemoryPubSub.Subscriber<String> testSubscriber = new InMemoryPubSub.Subscriber<String>() {
@Override
public void onMessage(String topic, String message) {
receivedMessages.add(message);
}
@Override
public Class<String> getMessageType() {
return String.class;
}
};
// Subscribe and publish
pubSub.subscribe("test-topic", testSubscriber);
pubSub.publish("test-topic", "Hello World");
pubSub.publish("test-topic", "Test Message");
// Verify
assertThat(receivedMessages).containsExactly("Hello World", "Test Message");
}
@SpringBootTest
@TestPropertySource(properties = {
"spring.redis.port=6379"
})
static class RedisPubSubIntegrationTest {
@Autowired
private RedisMessagePublisher publisher;
@Autowired
private RedisMessageSubscriber subscriber;
@Test
void testRedisPubSub() throws InterruptedException {
// Given
OrderEvent order = new OrderEvent("order-123", "customer-456",
new BigDecimal("99.99"), OrderStatus.CREATED);
// When
publisher.publishOrder(order);
// Then - wait for message to be processed
Thread.sleep(1000);
List<String> messageHistory = subscriber.getMessageHistory();
assertThat(messageHistory).isNotEmpty();
}
}
}
Conclusion
Pub/Sub messaging in Java offers several implementation options:
Redis Pub/Sub:
- ✅ Fast and reliable
- ✅ Persistent storage
- ✅ Cluster support
- ❌ Requires Redis infrastructure
Spring Framework:
- ✅ Tight Spring integration
- ✅ Rich feature set
- ✅ Production-ready
- ❌ Spring ecosystem dependency
Custom In-Memory:
- ✅ No external dependencies
- ✅ Full control
- ✅ Lightweight
- ❌ Not persistent
- ❌ Single JVM only
WebSocket:
- ✅ Real-time browser support
- ✅ HTTP compatible
- ✅ Cross-platform
- ❌ Connection management overhead
Best Practices:
- Use appropriate serialization (JSON, Avro, Protobuf)
- Implement proper error handling and retry mechanisms
- Monitor message rates and system health
- Consider message ordering guarantees
- Plan for scalability and fault tolerance
Choose the implementation that best fits your requirements for performance, scalability, and operational complexity.