Redis Streams for Events in Java: Complete Implementation

Introduction to Redis Streams for Event-Driven Architecture

Redis Streams is a powerful data structure that enables event-driven architectures with persistent messaging, consumer groups, and real-time processing capabilities. It's ideal for building event sourcing, CQRS, and microservices communication patterns.

Table of Contents

  1. Redis Streams Fundamentals
  2. Project Setup and Dependencies
  3. Event Publishing Service
  4. Event Consumption Patterns
  5. Consumer Groups for Scalability
  6. Dead Letter Queue Handling
  7. Monitoring and Management
  8. Performance Optimization
  9. Integration with Spring Boot
  10. Real-World Use Cases

1. Redis Streams Fundamentals

Key Concepts

  • Stream: Append-only log of events/messages
  • Entry: Individual event with ID and key-value pairs
  • Consumer Group: Multiple consumers sharing load
  • Pending Entries List (PEL): Messages delivered but not acknowledged
  • Claiming: Reassigning stalled messages to other consumers

2. Project Setup and Dependencies

Maven Configuration

<properties>
<jedis.version>5.0.2</jedis.version>
<spring-boot.version>3.1.0</spring-boot.version>
<jackson.version>2.15.2</jackson.version>
</properties>
<dependencies>
<!-- Redis Client -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-boot.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
</dependencies>

Application Configuration

# application.yml
spring:
redis:
host: localhost
port: 6379
password: 
database: 0
timeout: 2000ms
jedis:
pool:
max-active: 20
max-idle: 10
min-idle: 5
max-wait: 3000ms
app:
redis:
streams:
user-events: user:events
order-events: order:events
payment-events: payment:events
dlq-events: dlq:events
consumer-group: event-processors
consumer-name-prefix: consumer-
claim-timeout-ms: 30000
read-timeout-ms: 5000
batch-size: 100

3. Core Event Model

Base Event Classes

package com.example.redisstreams.events;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
import java.time.Instant;
import java.util.UUID;
@Data
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "@class")
public abstract class DomainEvent {
private String eventId;
private String eventType;
private String aggregateId;
private String aggregateType;
private Instant timestamp;
private String correlationId;
private String causationId;
protected DomainEvent() {
this.eventId = UUID.randomUUID().toString();
this.timestamp = Instant.now();
}
protected DomainEvent(String eventType, String aggregateId, String aggregateType) {
this();
this.eventType = eventType;
this.aggregateId = aggregateId;
this.aggregateType = aggregateType;
}
}
// User Domain Events
@Data
public class UserRegisteredEvent extends DomainEvent {
private String username;
private String email;
private String fullName;
public UserRegisteredEvent() {
super("USER_REGISTERED", null, "USER");
}
public UserRegisteredEvent(String aggregateId, String username, String email, String fullName) {
super("USER_REGISTERED", aggregateId, "USER");
this.username = username;
this.email = email;
this.fullName = fullName;
}
}
@Data
public class UserEmailChangedEvent extends DomainEvent {
private String oldEmail;
private String newEmail;
public UserEmailChangedEvent() {
super("USER_EMAIL_CHANGED", null, "USER");
}
public UserEmailChangedEvent(String aggregateId, String oldEmail, String newEmail) {
super("USER_EMAIL_CHANGED", aggregateId, "USER");
this.oldEmail = oldEmail;
this.newEmail = newEmail;
}
}
// Order Domain Events
@Data
public class OrderCreatedEvent extends DomainEvent {
private String userId;
private Double totalAmount;
private String currency;
public OrderCreatedEvent() {
super("ORDER_CREATED", null, "ORDER");
}
public OrderCreatedEvent(String aggregateId, String userId, Double totalAmount, String currency) {
super("ORDER_CREATED", aggregateId, "ORDER");
this.userId = userId;
this.totalAmount = totalAmount;
this.currency = currency;
}
}
@Data
public class OrderItemAddedEvent extends DomainEvent {
private String productId;
private String productName;
private Integer quantity;
private Double unitPrice;
public OrderItemAddedEvent() {
super("ORDER_ITEM_ADDED", null, "ORDER");
}
public OrderItemAddedEvent(String aggregateId, String productId, String productName, 
Integer quantity, Double unitPrice) {
super("ORDER_ITEM_ADDED", aggregateId, "ORDER");
this.productId = productId;
this.productName = productName;
this.quantity = quantity;
this.unitPrice = unitPrice;
}
}
@Data
public class OrderConfirmedEvent extends DomainEvent {
public OrderConfirmedEvent() {
super("ORDER_CONFIRMED", null, "ORDER");
}
public OrderConfirmedEvent(String aggregateId) {
super("ORDER_CONFIRMED", aggregateId, "ORDER");
}
}
// Payment Domain Events
@Data
public class PaymentProcessedEvent extends DomainEvent {
private String orderId;
private Double amount;
private String paymentMethod;
private Boolean success;
private String transactionId;
public PaymentProcessedEvent() {
super("PAYMENT_PROCESSED", null, "PAYMENT");
}
public PaymentProcessedEvent(String aggregateId, String orderId, Double amount, 
String paymentMethod, Boolean success, String transactionId) {
super("PAYMENT_PROCESSED", aggregateId, "PAYMENT");
this.orderId = orderId;
this.amount = amount;
this.paymentMethod = paymentMethod;
this.success = success;
this.transactionId = transactionId;
}
}

4. Redis Streams Service

Redis Configuration

package com.example.redisstreams.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
@Configuration
public class RedisConfig {
@Value("${spring.redis.host:localhost}")
private String redisHost;
@Value("${spring.redis.port:6379}")
private int redisPort;
@Value("${spring.redis.password:}")
private String redisPassword;
@Value("${spring.redis.timeout:2000}")
private int timeout;
@Value("${spring.redis.jedis.pool.max-active:20}")
private int maxActive;
@Value("${spring.redis.jedis.pool.max-idle:10}")
private int maxIdle;
@Value("${spring.redis.jedis.pool.min-idle:5}")
private int minIdle;
@Value("${spring.redis.jedis.pool.max-wait:3000}")
private long maxWait;
@Bean
public JedisPool jedisPool() {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(maxActive);
poolConfig.setMaxIdle(maxIdle);
poolConfig.setMinIdle(minIdle);
poolConfig.setMaxWaitMillis(maxWait);
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
poolConfig.setTestWhileIdle(true);
if (redisPassword != null && !redisPassword.isEmpty()) {
return new JedisPool(poolConfig, redisHost, redisPort, timeout, redisPassword);
} else {
return new JedisPool(poolConfig, redisHost, redisPort, timeout);
}
}
}

Event Serialization Service

package com.example.redisstreams.serialization;
import com.example.redisstreams.events.DomainEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class EventSerializationService {
private static final Logger logger = LoggerFactory.getLogger(EventSerializationService.class);
private final ObjectMapper objectMapper;
public EventSerializationService() {
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule());
// Enable polymorphic deserialization
this.objectMapper.activateDefaultTyping(
objectMapper.getPolymorphicTypeValidator(),
ObjectMapper.DefaultTyping.NON_FINAL
);
}
public String serialize(DomainEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
logger.error("Failed to serialize event: {}", event.getEventType(), e);
throw new SerializationException("Failed to serialize event", e);
}
}
public DomainEvent deserialize(String json) {
try {
return objectMapper.readValue(json, DomainEvent.class);
} catch (IOException e) {
logger.error("Failed to deserialize event JSON: {}", json, e);
throw new SerializationException("Failed to deserialize event", e);
}
}
public static class SerializationException extends RuntimeException {
public SerializationException(String message) {
super(message);
}
public SerializationException(String message, Throwable cause) {
super(message, cause);
}
}
}

Redis Streams Service

package com.example.redisstreams.service;
import com.example.redisstreams.events.DomainEvent;
import com.example.redisstreams.serialization.EventSerializationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
import redis.clients.jedis.resps.StreamGroupInfo;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
public class RedisStreamsService {
private static final Logger logger = LoggerFactory.getLogger(RedisStreamsService.class);
private final JedisPool jedisPool;
private final EventSerializationService serializationService;
@Value("${app.redis.streams.user-events:user:events}")
private String userEventsStream;
@Value("${app.redis.streams.order-events:order:events}")
private String orderEventsStream;
@Value("${app.redis.streams.payment-events:payment:events}")
private String paymentEventsStream;
@Value("${app.redis.streams.dlq-events:dlq:events}")
private String dlqStream;
@Value("${app.redis.consumer-group:event-processors}")
private String consumerGroup;
@Value("${app.redis.claim-timeout-ms:30000}")
private long claimTimeoutMs;
@Value("${app.redis.batch-size:100}")
private int batchSize;
public RedisStreamsService(JedisPool jedisPool, EventSerializationService serializationService) {
this.jedisPool = jedisPool;
this.serializationService = serializationService;
initializeConsumerGroups();
}
private void initializeConsumerGroups() {
try (Jedis jedis = jedisPool.getResource()) {
createConsumerGroupIfNotExists(jedis, userEventsStream);
createConsumerGroupIfNotExists(jedis, orderEventsStream);
createConsumerGroupIfNotExists(jedis, paymentEventsStream);
createConsumerGroupIfNotExists(jedis, dlqStream);
}
}
private void createConsumerGroupIfNotExists(Jedis jedis, String stream) {
try {
// Try to get group info to check if it exists
List<StreamGroupInfo> groups = jedis.xinfoGroups(stream);
boolean groupExists = groups.stream()
.anyMatch(group -> consumerGroup.equals(group.getName()));
if (!groupExists) {
jedis.xgroupCreate(stream, consumerGroup, new StreamEntryID(), true);
logger.info("Created consumer group '{}' for stream '{}'", consumerGroup, stream);
}
} catch (Exception e) {
// Group might not exist, create it
try {
jedis.xgroupCreate(stream, consumerGroup, new StreamEntryID(), true);
logger.info("Created consumer group '{}' for stream '{}'", consumerGroup, stream);
} catch (Exception ex) {
logger.warn("Consumer group '{}' might already exist for stream '{}'", 
consumerGroup, stream);
}
}
}
// Event Publishing
public StreamEntryID publishEvent(String stream, DomainEvent event) {
try (Jedis jedis = jedisPool.getResource()) {
String eventJson = serializationService.serialize(event);
Map<String, String> fields = Map.of(
"eventType", event.getEventType(),
"aggregateId", event.getAggregateId(),
"aggregateType", event.getAggregateType(),
"eventData", eventJson,
"timestamp", event.getTimestamp().toString()
);
StreamEntryID entryId = jedis.xadd(stream, StreamEntryID.NEW_ENTRY, fields);
logger.debug("Published event {} to stream {} with ID {}", 
event.getEventType(), stream, entryId);
return entryId;
} catch (Exception e) {
logger.error("Failed to publish event to stream {}: {}", stream, event.getEventType(), e);
throw new StreamOperationException("Failed to publish event", e);
}
}
public StreamEntryID publishUserEvent(DomainEvent event) {
return publishEvent(userEventsStream, event);
}
public StreamEntryID publishOrderEvent(DomainEvent event) {
return publishEvent(orderEventsStream, event);
}
public StreamEntryID publishPaymentEvent(DomainEvent event) {
return publishEvent(paymentEventsStream, event);
}
public StreamEntryID publishToDLQ(DomainEvent event, String originalStream, 
StreamEntryID originalId, String error) {
try (Jedis jedis = jedisPool.getResource()) {
String eventJson = serializationService.serialize(event);
Map<String, String> fields = Map.of(
"eventType", event.getEventType(),
"aggregateId", event.getAggregateId(),
"originalStream", originalStream,
"originalId", originalId.toString(),
"error", error,
"eventData", eventJson,
"timestamp", event.getTimestamp().toString()
);
return jedis.xadd(dlqStream, StreamEntryID.NEW_ENTRY, fields);
} catch (Exception e) {
logger.error("Failed to publish event to DLQ: {}", event.getEventType(), e);
throw new StreamOperationException("Failed to publish to DLQ", e);
}
}
// Event Consumption
public List<StreamEvent> readEvents(String stream, String consumerName) {
try (Jedis jedis = jedisPool.getResource()) {
Map<String, StreamEntryID> streams = Map.of(stream, StreamEntryID.UNRECEIVED_ENTRY);
var params = XReadGroupParams.xReadGroupParams()
.block(5000) // 5 second block
.count(batchSize);
List<Map.Entry<String, List<StreamEntry>>> results = 
jedis.xreadGroup(consumerGroup, consumerName, params, streams);
if (results.isEmpty()) {
return Collections.emptyList();
}
return results.get(0).getValue().stream()
.map(entry -> new StreamEvent(stream, entry.getID(), entry.getFields()))
.collect(Collectors.toList());
} catch (Exception e) {
logger.error("Failed to read events from stream {}: {}", stream, e.getMessage());
throw new StreamOperationException("Failed to read events", e);
}
}
public List<StreamEvent> readUserEvents(String consumerName) {
return readEvents(userEventsStream, consumerName);
}
public List<StreamEvent> readOrderEvents(String consumerName) {
return readEvents(orderEventsStream, consumerName);
}
public List<StreamEvent> readPaymentEvents(String consumerName) {
return readEvents(paymentEventsStream, consumerName);
}
// Acknowledgment
public void acknowledgeEvent(String stream, StreamEntryID entryId) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.xack(stream, consumerGroup, entryId);
logger.debug("Acknowledged event {} from stream {}", entryId, stream);
} catch (Exception e) {
logger.error("Failed to acknowledge event {} from stream {}", entryId, stream, e);
throw new StreamOperationException("Failed to acknowledge event", e);
}
}
// Claim stalled messages
public List<StreamEvent> claimStalledEvents(String stream, String consumerName) {
try (Jedis jedis = jedisPool.getResource()) {
// Get pending entries
List<Map.Entry<String, List<StreamEntry>>> pending = jedis.xpending(
stream, consumerGroup, null, null, batchSize, consumerName);
if (pending.isEmpty()) {
return Collections.emptyList();
}
// Claim entries that are idle for longer than timeout
List<StreamEntryID> entryIds = pending.stream()
.map(Map.Entry::getKey)
.map(StreamEntryID::new)
.collect(Collectors.toList());
var claimed = jedis.xclaim(
stream, consumerGroup, consumerName, claimTimeoutMs, 
null, entryIds.toArray(new StreamEntryID[0]));
return claimed.stream()
.map(entry -> new StreamEvent(stream, entry.getID(), entry.getFields()))
.collect(Collectors.toList());
} catch (Exception e) {
logger.error("Failed to claim stalled events from stream {}: {}", stream, e.getMessage());
throw new StreamOperationException("Failed to claim stalled events", e);
}
}
// Stream information
public long getStreamLength(String stream) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.xlen(stream);
} catch (Exception e) {
logger.error("Failed to get stream length for {}: {}", stream, e.getMessage());
return -1;
}
}
public long getPendingCount(String stream) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.xpending(stream, consumerGroup).getTotalPending();
} catch (Exception e) {
logger.error("Failed to get pending count for stream {}: {}", stream, e.getMessage());
return -1;
}
}
public static class StreamEvent {
private final String stream;
private final StreamEntryID entryId;
private final Map<String, String> fields;
public StreamEvent(String stream, StreamEntryID entryId, Map<String, String> fields) {
this.stream = stream;
this.entryId = entryId;
this.fields = fields;
}
public String getStream() { return stream; }
public StreamEntryID getEntryId() { return entryId; }
public Map<String, String> getFields() { return fields; }
public String getEventType() { return fields.get("eventType"); }
public String getEventData() { return fields.get("eventData"); }
}
public static class StreamOperationException extends RuntimeException {
public StreamOperationException(String message) {
super(message);
}
public StreamOperationException(String message, Throwable cause) {
super(message, cause);
}
}
}

5. Event Consumers

Event Consumer Interface

package com.example.redisstreams.consumer;
import com.example.redisstreams.events.DomainEvent;
import com.example.redisstreams.service.RedisStreamsService;
public interface EventConsumer {
String getConsumerName();
String getStream();
void processEvent(DomainEvent event, RedisStreamsService.StreamEvent streamEvent);
Class<? extends DomainEvent> getEventType();
default boolean supports(DomainEvent event) {
return getEventType().isInstance(event);
}
}

User Events Consumer

package com.example.redisstreams.consumer;
import com.example.redisstreams.events.DomainEvent;
import com.example.redisstreams.events.UserEmailChangedEvent;
import com.example.redisstreams.events.UserRegisteredEvent;
import com.example.redisstreams.service.RedisStreamsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class UserEventsConsumer implements EventConsumer {
private static final Logger logger = LoggerFactory.getLogger(UserEventsConsumer.class);
private static final String CONSUMER_NAME = "user-events-processor";
private static final String STREAM = "user:events";
@Override
public String getConsumerName() {
return CONSUMER_NAME;
}
@Override
public String getStream() {
return STREAM;
}
@Override
public Class<? extends DomainEvent> getEventType() {
return DomainEvent.class; // Handle all user events
}
@Override
public void processEvent(DomainEvent event, RedisStreamsService.StreamEvent streamEvent) {
try {
if (event instanceof UserRegisteredEvent) {
handleUserRegistered((UserRegisteredEvent) event);
} else if (event instanceof UserEmailChangedEvent) {
handleUserEmailChanged((UserEmailChangedEvent) event);
} else {
logger.warn("Unhandled user event type: {}", event.getEventType());
}
logger.info("Processed user event: {} for user {}", 
event.getEventType(), event.getAggregateId());
} catch (Exception e) {
logger.error("Failed to process user event: {}", event.getEventType(), e);
throw new EventProcessingException("Failed to process user event", e);
}
}
private void handleUserRegistered(UserRegisteredEvent event) {
// Business logic for user registration
logger.info("Processing user registration: {} ({})", 
event.getUsername(), event.getEmail());
// Example: Update read model, send welcome email, etc.
// userReadModelService.update(event);
// emailService.sendWelcomeEmail(event.getEmail(), event.getFullName());
}
private void handleUserEmailChanged(UserEmailChangedEvent event) {
// Business logic for email change
logger.info("Processing email change: {} -> {} for user {}", 
event.getOldEmail(), event.getNewEmail(), event.getAggregateId());
// Example: Update read model, send verification email, etc.
// userReadModelService.updateEmail(event.getAggregateId(), event.getNewEmail());
// emailService.sendEmailChangeNotification(event.getOldEmail(), event.getNewEmail());
}
public static class EventProcessingException extends RuntimeException {
public EventProcessingException(String message) {
super(message);
}
public EventProcessingException(String message, Throwable cause) {
super(message, cause);
}
}
}

Order Events Consumer

package com.example.redisstreams.consumer;
import com.example.redisstreams.events.*;
import com.example.redisstreams.service.RedisStreamsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class OrderEventsConsumer implements EventConsumer {
private static final Logger logger = LoggerFactory.getLogger(OrderEventsConsumer.class);
private static final String CONSUMER_NAME = "order-events-processor";
private static final String STREAM = "order:events";
@Override
public String getConsumerName() {
return CONSUMER_NAME;
}
@Override
public String getStream() {
return STREAM;
}
@Override
public Class<? extends DomainEvent> getEventType() {
return DomainEvent.class; // Handle all order events
}
@Override
public void processEvent(DomainEvent event, RedisStreamsService.StreamEvent streamEvent) {
try {
if (event instanceof OrderCreatedEvent) {
handleOrderCreated((OrderCreatedEvent) event);
} else if (event instanceof OrderItemAddedEvent) {
handleOrderItemAdded((OrderItemAddedEvent) event);
} else if (event instanceof OrderConfirmedEvent) {
handleOrderConfirmed((OrderConfirmedEvent) event);
} else {
logger.warn("Unhandled order event type: {}", event.getEventType());
}
logger.info("Processed order event: {} for order {}", 
event.getEventType(), event.getAggregateId());
} catch (Exception e) {
logger.error("Failed to process order event: {}", event.getEventType(), e);
throw new EventProcessingException("Failed to process order event", e);
}
}
private void handleOrderCreated(OrderCreatedEvent event) {
logger.info("Processing order creation: {} for user {}", 
event.getAggregateId(), event.getUserId());
// Update order read model
// orderReadModelService.createOrder(event);
// Trigger payment process
// paymentService.initiatePayment(event.getAggregateId(), event.getTotalAmount());
}
private void handleOrderItemAdded(OrderItemAddedEvent event) {
logger.info("Processing order item added: {} to order {}", 
event.getProductId(), event.getAggregateId());
// Update order read model with new item
// orderReadModelService.addOrderItem(event.getAggregateId(), event);
}
private void handleOrderConfirmed(OrderConfirmedEvent event) {
logger.info("Processing order confirmation: {}", event.getAggregateId());
// Update order status in read model
// orderReadModelService.confirmOrder(event.getAggregateId());
// Notify inventory system
// inventoryService.reserveItemsForOrder(event.getAggregateId());
}
}

6. Event Consumer Manager

Event Consumer Manager

package com.example.redisstreams.manager;
import com.example.redisstreams.consumer.EventConsumer;
import com.example.redisstreams.events.DomainEvent;
import com.example.redisstreams.serialization.EventSerializationService;
import com.example.redisstreams.service.RedisStreamsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Component
public class EventConsumerManager {
private static final Logger logger = LoggerFactory.getLogger(EventConsumerManager.class);
private final List<EventConsumer> eventConsumers;
private final RedisStreamsService streamsService;
private final EventSerializationService serializationService;
private final ExecutorService executorService;
private final AtomicBoolean running;
public EventConsumerManager(List<EventConsumer> eventConsumers,
RedisStreamsService streamsService,
EventSerializationService serializationService) {
this.eventConsumers = eventConsumers;
this.streamsService = streamsService;
this.serializationService = serializationService;
this.executorService = Executors.newFixedThreadPool(eventConsumers.size());
this.running = new AtomicBoolean(false);
}
@PostConstruct
public void start() {
logger.info("Starting EventConsumerManager with {} consumers", eventConsumers.size());
running.set(true);
for (EventConsumer consumer : eventConsumers) {
executorService.submit(() -> processEventsForConsumer(consumer));
}
}
@PreDestroy
public void stop() {
logger.info("Stopping EventConsumerManager");
running.set(false);
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
private void processEventsForConsumer(EventConsumer consumer) {
logger.info("Starting event processing for consumer: {}", consumer.getConsumerName());
while (running.get()) {
try {
List<RedisStreamsService.StreamEvent> streamEvents = 
streamsService.readEvents(consumer.getStream(), consumer.getConsumerName());
if (streamEvents.isEmpty()) {
// No events available, sleep briefly
Thread.sleep(100);
continue;
}
for (RedisStreamsService.StreamEvent streamEvent : streamEvents) {
processSingleEvent(consumer, streamEvent);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info("Event processing interrupted for consumer: {}", consumer.getConsumerName());
break;
} catch (Exception e) {
logger.error("Error processing events for consumer {}: {}", 
consumer.getConsumerName(), e.getMessage(), e);
try {
Thread.sleep(1000); // Backoff on error
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
logger.info("Stopped event processing for consumer: {}", consumer.getConsumerName());
}
private void processSingleEvent(EventConsumer consumer, RedisStreamsService.StreamEvent streamEvent) {
try {
String eventData = streamEvent.getEventData();
DomainEvent event = serializationService.deserialize(eventData);
if (consumer.supports(event)) {
consumer.processEvent(event, streamEvent);
streamsService.acknowledgeEvent(streamEvent.getStream(), streamEvent.getEntryId());
logger.debug("Successfully processed and acknowledged event: {}", streamEvent.getEntryId());
} else {
logger.warn("Consumer {} does not support event type: {}", 
consumer.getConsumerName(), event.getEventType());
streamsService.acknowledgeEvent(streamEvent.getStream(), streamEvent.getEntryId());
}
} catch (Exception e) {
logger.error("Failed to process event {} from stream {}: {}", 
streamEvent.getEntryId(), streamEvent.getStream(), e.getMessage(), e);
// Move to Dead Letter Queue
handleFailedEvent(consumer, streamEvent, e);
}
}
private void handleFailedEvent(EventConsumer consumer, RedisStreamsService.StreamEvent streamEvent, Exception error) {
try {
DomainEvent event = serializationService.deserialize(streamEvent.getEventData());
streamsService.publishToDLQ(
event, 
streamEvent.getStream(), 
streamEvent.getEntryId(), 
error.getMessage()
);
// Acknowledge the original event to prevent reprocessing
streamsService.acknowledgeEvent(streamEvent.getStream(), streamEvent.getEntryId());
logger.warn("Moved failed event {} to DLQ", streamEvent.getEntryId());
} catch (Exception dlqError) {
logger.error("Failed to move event {} to DLQ: {}", streamEvent.getEntryId(), dlqError.getMessage());
// Don't acknowledge, so it will be retried
}
}
@Scheduled(fixedDelay = 30000) // Every 30 seconds
public void processStalledEvents() {
if (!running.get()) {
return;
}
logger.debug("Checking for stalled events");
for (EventConsumer consumer : eventConsumers) {
try {
List<RedisStreamsService.StreamEvent> stalledEvents = 
streamsService.claimStalledEvents(consumer.getStream(), consumer.getConsumerName());
if (!stalledEvents.isEmpty()) {
logger.info("Claimed {} stalled events for consumer {}", 
stalledEvents.size(), consumer.getConsumerName());
for (RedisStreamsService.StreamEvent streamEvent : stalledEvents) {
processSingleEvent(consumer, streamEvent);
}
}
} catch (Exception e) {
logger.error("Error processing stalled events for consumer {}: {}", 
consumer.getConsumerName(), e.getMessage());
}
}
}
@Scheduled(fixedRate = 60000) // Every minute
public void logConsumerStats() {
if (logger.isDebugEnabled()) {
for (EventConsumer consumer : eventConsumers) {
long streamLength = streamsService.getStreamLength(consumer.getStream());
long pendingCount = streamsService.getPendingCount(consumer.getStream());
logger.debug("Stream {}: length={}, pending={}", 
consumer.getStream(), streamLength, pendingCount);
}
}
}
}

7. REST Controllers

Event Publishing Controller

package com.example.redisstreams.web;
import com.example.redisstreams.events.*;
import com.example.redisstreams.service.RedisStreamsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.UUID;
@RestController
@RequestMapping("/api/events")
public class EventController {
private static final Logger logger = LoggerFactory.getLogger(EventController.class);
private final RedisStreamsService streamsService;
public EventController(RedisStreamsService streamsService) {
this.streamsService = streamsService;
}
@PostMapping("/users/register")
public ResponseEntity<EventResponse> registerUser(@RequestBody UserRegistrationRequest request) {
String userId = "USER-" + UUID.randomUUID();
UserRegisteredEvent event = new UserRegisteredEvent(
userId, request.username(), request.email(), request.fullName()
);
var entryId = streamsService.publishUserEvent(event);
logger.info("Published user registration event: {} for user {}", entryId, userId);
return ResponseEntity.ok(new EventResponse(userId, entryId.toString(), "User registered successfully"));
}
@PutMapping("/users/{userId}/email")
public ResponseEntity<EventResponse> changeUserEmail(
@PathVariable String userId,
@RequestBody ChangeEmailRequest request) {
UserEmailChangedEvent event = new UserEmailChangedEvent(
userId, request.oldEmail(), request.newEmail()
);
var entryId = streamsService.publishUserEvent(event);
logger.info("Published email change event: {} for user {}", entryId, userId);
return ResponseEntity.ok(new EventResponse(userId, entryId.toString(), "Email change initiated"));
}
@PostMapping("/orders/create")
public ResponseEntity<EventResponse> createOrder(@RequestBody CreateOrderRequest request) {
String orderId = "ORDER-" + UUID.randomUUID();
OrderCreatedEvent event = new OrderCreatedEvent(
orderId, request.userId(), request.totalAmount(), request.currency()
);
var entryId = streamsService.publishOrderEvent(event);
logger.info("Published order creation event: {} for order {}", entryId, orderId);
return ResponseEntity.ok(new EventResponse(orderId, entryId.toString(), "Order created successfully"));
}
@PostMapping("/orders/{orderId}/items")
public ResponseEntity<EventResponse> addOrderItem(
@PathVariable String orderId,
@RequestBody AddOrderItemRequest request) {
OrderItemAddedEvent event = new OrderItemAddedEvent(
orderId, request.productId(), request.productName(), 
request.quantity(), request.unitPrice()
);
var entryId = streamsService.publishOrderEvent(event);
logger.info("Published order item event: {} for order {}", entryId, orderId);
return ResponseEntity.ok(new EventResponse(orderId, entryId.toString(), "Item added to order"));
}
@PostMapping("/orders/{orderId}/confirm")
public ResponseEntity<EventResponse> confirmOrder(@PathVariable String orderId) {
OrderConfirmedEvent event = new OrderConfirmedEvent(orderId);
var entryId = streamsService.publishOrderEvent(event);
logger.info("Published order confirmation event: {} for order {}", entryId, orderId);
return ResponseEntity.ok(new EventResponse(orderId, entryId.toString(), "Order confirmed"));
}
@PostMapping("/payments/process")
public ResponseEntity<EventResponse> processPayment(@RequestBody ProcessPaymentRequest request) {
String paymentId = "PAYMENT-" + UUID.randomUUID();
PaymentProcessedEvent event = new PaymentProcessedEvent(
paymentId, request.orderId(), request.amount(), 
request.paymentMethod(), request.success(), request.transactionId()
);
var entryId = streamsService.publishPaymentEvent(event);
logger.info("Published payment processed event: {} for payment {}", entryId, paymentId);
return ResponseEntity.ok(new EventResponse(paymentId, entryId.toString(), "Payment processed"));
}
// Request/Response DTOs
public record UserRegistrationRequest(String username, String email, String fullName) {}
public record ChangeEmailRequest(String oldEmail, String newEmail) {}
public record CreateOrderRequest(String userId, Double totalAmount, String currency) {}
public record AddOrderItemRequest(String productId, String productName, Integer quantity, Double unitPrice) {}
public record ProcessPaymentRequest(String orderId, Double amount, String paymentMethod, 
Boolean success, String transactionId) {}
public record EventResponse(String entityId, String eventId, String message) {}
}

Monitoring Controller

package com.example.redisstreams.web;
import com.example.redisstreams.service.RedisStreamsService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
@RestController
@RequestMapping("/api/monitoring")
public class MonitoringController {
private final RedisStreamsService streamsService;
public MonitoringController(RedisStreamsService streamsService) {
this.streamsService = streamsService;
}
@GetMapping("/stats")
public Map<String, Object> getStreamStats() {
return Map.of(
"userEventsStream", Map.of(
"length", streamsService.getStreamLength("user:events"),
"pending", streamsService.getPendingCount("user:events")
),
"orderEventsStream", Map.of(
"length", streamsService.getStreamLength("order:events"),
"pending", streamsService.getPendingCount("order:events")
),
"paymentEventsStream", Map.of(
"length", streamsService.getStreamLength("payment:events"),
"pending", streamsService.getPendingCount("payment:events")
),
"dlqStream", Map.of(
"length", streamsService.getStreamLength("dlq:events"),
"pending", streamsService.getPendingCount("dlq:events")
)
);
}
}

8. Testing

Integration Tests with Testcontainers

package com.example.redisstreams.test;
import com.example.redisstreams.events.UserRegisteredEvent;
import com.example.redisstreams.service.RedisStreamsService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
@SpringBootTest
class RedisStreamsIntegrationTest {
@Container
static GenericContainer<?> redis = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
.withExposedPorts(6379);
@Autowired
private RedisStreamsService streamsService;
@BeforeEach
void setUp() {
// Testcontainers automatically sets spring.redis.host and spring.redis.port
System.setProperty("spring.redis.host", redis.getHost());
System.setProperty("spring.redis.port", redis.getFirstMappedPort().toString());
}
@Test
void testPublishAndReadEvent() {
// Given
String userId = "USER-" + UUID.randomUUID();
UserRegisteredEvent event = new UserRegisteredEvent(userId, "testuser", "[email protected]", "Test User");
// When
var entryId = streamsService.publishUserEvent(event);
// Then
assertThat(entryId).isNotNull();
// Read events (this would normally be done by the consumer)
var events = streamsService.readUserEvents("test-consumer");
assertThat(events).isNotEmpty();
}
}

Unit Tests

package com.example.redisstreams.test;
import com.example.redisstreams.consumer.UserEventsConsumer;
import com.example.redisstreams.events.UserRegisteredEvent;
import com.example.redisstreams.service.RedisStreamsService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
class UserEventsConsumerTest {
@Mock
private RedisStreamsService streamsService;
@Test
void testProcessUserRegisteredEvent() {
// Given
UserEventsConsumer consumer = new UserEventsConsumer();
UserRegisteredEvent event = new UserRegisteredEvent("USER-123", "john", "[email protected]", "John Doe");
RedisStreamsService.StreamEvent streamEvent = 
new RedisStreamsService.StreamEvent("user:events", null, null);
// When
consumer.processEvent(event, streamEvent);
// Then - Verify no exceptions are thrown
// In real test, you would verify interactions with dependencies
}
}

9. Spring Boot Application

Main Application Class

package com.example.redisstreams;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class RedisStreamsApplication {
public static void main(String[] args) {
SpringApplication.run(RedisStreamsApplication.class, args);
}
}

10. Performance Optimization and Best Practices

Configuration for High Throughput

# application-prod.yml
spring:
redis:
host: ${REDIS_HOST:localhost}
port: ${REDIS_PORT:6379}
password: ${REDIS_PASSWORD:}
timeout: 1000ms
jedis:
pool:
max-active: 50
max-idle: 20
min-idle: 10
max-wait: 1000ms
app:
redis:
batch-size: 200
read-timeout-ms: 1000
claim-timeout-ms: 60000  # 1 minute for production

Monitoring and Metrics

package com.example.redisstreams.monitoring;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Component
public class EventMetrics {
private final MeterRegistry meterRegistry;
private final ConcurrentHashMap<String, Counter> eventCounters;
private final Timer eventProcessingTimer;
public EventMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.eventCounters = new ConcurrentHashMap<>();
this.eventProcessingTimer = Timer.builder("event.processing.time")
.description("Time taken to process events")
.register(meterRegistry);
}
public void recordEventProcessed(String eventType, boolean success, long durationMs) {
String status = success ? "success" : "failure";
// Event counter by type and status
Counter counter = eventCounters.computeIfAbsent(
eventType + "." + status,
key -> Counter.builder("event.processed")
.tag("type", eventType)
.tag("status", status)
.register(meterRegistry)
);
counter.increment();
// Processing time
eventProcessingTimer.record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordDlqEvent(String eventType) {
Counter.builder("event.dlq")
.tag("type", eventType)
.register(meterRegistry)
.increment();
}
}

Summary

This comprehensive Redis Streams implementation provides:

Key Features:

  • Event Publishing: Reliable event publishing to Redis Streams
  • Consumer Groups: Scalable event processing with multiple consumers
  • Dead Letter Queue: Automatic handling of failed events
  • Stalled Message Recovery: Automatic claiming of stalled messages
  • Monitoring: Comprehensive monitoring and metrics
  • Error Handling: Robust error handling and retry mechanisms

Benefits:

  • Scalability: Horizontal scaling with consumer groups
  • Durability: Persistent event storage with Redis
  • Performance: High-throughput event processing
  • Reliability: At-least-once delivery semantics
  • Observability: Comprehensive monitoring and metrics

Use Cases:

  • Event Sourcing and CQRS implementations
  • Microservices communication
  • Real-time data processing pipelines
  • Audit logging and compliance
  • Asynchronous workflow processing

This implementation is production-ready and can be extended with additional features like event schema evolution, advanced monitoring, and integration with other systems.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper