Introduction to Redis Streams for Event-Driven Architecture
Redis Streams is a powerful data structure that enables event-driven architectures with persistent messaging, consumer groups, and real-time processing capabilities. It's ideal for building event sourcing, CQRS, and microservices communication patterns.
Table of Contents
- Redis Streams Fundamentals
- Project Setup and Dependencies
- Event Publishing Service
- Event Consumption Patterns
- Consumer Groups for Scalability
- Dead Letter Queue Handling
- Monitoring and Management
- Performance Optimization
- Integration with Spring Boot
- Real-World Use Cases
1. Redis Streams Fundamentals
Key Concepts
- Stream: Append-only log of events/messages
- Entry: Individual event with ID and key-value pairs
- Consumer Group: Multiple consumers sharing load
- Pending Entries List (PEL): Messages delivered but not acknowledged
- Claiming: Reassigning stalled messages to other consumers
2. Project Setup and Dependencies
Maven Configuration
<properties>
<jedis.version>5.0.2</jedis.version>
<spring-boot.version>3.1.0</spring-boot.version>
<jackson.version>2.15.2</jackson.version>
</properties>
<dependencies>
<!-- Redis Client -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-boot.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
</dependencies>
Application Configuration
# application.yml spring: redis: host: localhost port: 6379 password: database: 0 timeout: 2000ms jedis: pool: max-active: 20 max-idle: 10 min-idle: 5 max-wait: 3000ms app: redis: streams: user-events: user:events order-events: order:events payment-events: payment:events dlq-events: dlq:events consumer-group: event-processors consumer-name-prefix: consumer- claim-timeout-ms: 30000 read-timeout-ms: 5000 batch-size: 100
3. Core Event Model
Base Event Classes
package com.example.redisstreams.events;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
import java.time.Instant;
import java.util.UUID;
@Data
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "@class")
public abstract class DomainEvent {
private String eventId;
private String eventType;
private String aggregateId;
private String aggregateType;
private Instant timestamp;
private String correlationId;
private String causationId;
protected DomainEvent() {
this.eventId = UUID.randomUUID().toString();
this.timestamp = Instant.now();
}
protected DomainEvent(String eventType, String aggregateId, String aggregateType) {
this();
this.eventType = eventType;
this.aggregateId = aggregateId;
this.aggregateType = aggregateType;
}
}
// User Domain Events
@Data
public class UserRegisteredEvent extends DomainEvent {
private String username;
private String email;
private String fullName;
public UserRegisteredEvent() {
super("USER_REGISTERED", null, "USER");
}
public UserRegisteredEvent(String aggregateId, String username, String email, String fullName) {
super("USER_REGISTERED", aggregateId, "USER");
this.username = username;
this.email = email;
this.fullName = fullName;
}
}
@Data
public class UserEmailChangedEvent extends DomainEvent {
private String oldEmail;
private String newEmail;
public UserEmailChangedEvent() {
super("USER_EMAIL_CHANGED", null, "USER");
}
public UserEmailChangedEvent(String aggregateId, String oldEmail, String newEmail) {
super("USER_EMAIL_CHANGED", aggregateId, "USER");
this.oldEmail = oldEmail;
this.newEmail = newEmail;
}
}
// Order Domain Events
@Data
public class OrderCreatedEvent extends DomainEvent {
private String userId;
private Double totalAmount;
private String currency;
public OrderCreatedEvent() {
super("ORDER_CREATED", null, "ORDER");
}
public OrderCreatedEvent(String aggregateId, String userId, Double totalAmount, String currency) {
super("ORDER_CREATED", aggregateId, "ORDER");
this.userId = userId;
this.totalAmount = totalAmount;
this.currency = currency;
}
}
@Data
public class OrderItemAddedEvent extends DomainEvent {
private String productId;
private String productName;
private Integer quantity;
private Double unitPrice;
public OrderItemAddedEvent() {
super("ORDER_ITEM_ADDED", null, "ORDER");
}
public OrderItemAddedEvent(String aggregateId, String productId, String productName,
Integer quantity, Double unitPrice) {
super("ORDER_ITEM_ADDED", aggregateId, "ORDER");
this.productId = productId;
this.productName = productName;
this.quantity = quantity;
this.unitPrice = unitPrice;
}
}
@Data
public class OrderConfirmedEvent extends DomainEvent {
public OrderConfirmedEvent() {
super("ORDER_CONFIRMED", null, "ORDER");
}
public OrderConfirmedEvent(String aggregateId) {
super("ORDER_CONFIRMED", aggregateId, "ORDER");
}
}
// Payment Domain Events
@Data
public class PaymentProcessedEvent extends DomainEvent {
private String orderId;
private Double amount;
private String paymentMethod;
private Boolean success;
private String transactionId;
public PaymentProcessedEvent() {
super("PAYMENT_PROCESSED", null, "PAYMENT");
}
public PaymentProcessedEvent(String aggregateId, String orderId, Double amount,
String paymentMethod, Boolean success, String transactionId) {
super("PAYMENT_PROCESSED", aggregateId, "PAYMENT");
this.orderId = orderId;
this.amount = amount;
this.paymentMethod = paymentMethod;
this.success = success;
this.transactionId = transactionId;
}
}
4. Redis Streams Service
Redis Configuration
package com.example.redisstreams.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
@Configuration
public class RedisConfig {
@Value("${spring.redis.host:localhost}")
private String redisHost;
@Value("${spring.redis.port:6379}")
private int redisPort;
@Value("${spring.redis.password:}")
private String redisPassword;
@Value("${spring.redis.timeout:2000}")
private int timeout;
@Value("${spring.redis.jedis.pool.max-active:20}")
private int maxActive;
@Value("${spring.redis.jedis.pool.max-idle:10}")
private int maxIdle;
@Value("${spring.redis.jedis.pool.min-idle:5}")
private int minIdle;
@Value("${spring.redis.jedis.pool.max-wait:3000}")
private long maxWait;
@Bean
public JedisPool jedisPool() {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(maxActive);
poolConfig.setMaxIdle(maxIdle);
poolConfig.setMinIdle(minIdle);
poolConfig.setMaxWaitMillis(maxWait);
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
poolConfig.setTestWhileIdle(true);
if (redisPassword != null && !redisPassword.isEmpty()) {
return new JedisPool(poolConfig, redisHost, redisPort, timeout, redisPassword);
} else {
return new JedisPool(poolConfig, redisHost, redisPort, timeout);
}
}
}
Event Serialization Service
package com.example.redisstreams.serialization;
import com.example.redisstreams.events.DomainEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class EventSerializationService {
private static final Logger logger = LoggerFactory.getLogger(EventSerializationService.class);
private final ObjectMapper objectMapper;
public EventSerializationService() {
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule());
// Enable polymorphic deserialization
this.objectMapper.activateDefaultTyping(
objectMapper.getPolymorphicTypeValidator(),
ObjectMapper.DefaultTyping.NON_FINAL
);
}
public String serialize(DomainEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
logger.error("Failed to serialize event: {}", event.getEventType(), e);
throw new SerializationException("Failed to serialize event", e);
}
}
public DomainEvent deserialize(String json) {
try {
return objectMapper.readValue(json, DomainEvent.class);
} catch (IOException e) {
logger.error("Failed to deserialize event JSON: {}", json, e);
throw new SerializationException("Failed to deserialize event", e);
}
}
public static class SerializationException extends RuntimeException {
public SerializationException(String message) {
super(message);
}
public SerializationException(String message, Throwable cause) {
super(message, cause);
}
}
}
Redis Streams Service
package com.example.redisstreams.service;
import com.example.redisstreams.events.DomainEvent;
import com.example.redisstreams.serialization.EventSerializationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
import redis.clients.jedis.resps.StreamGroupInfo;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
public class RedisStreamsService {
private static final Logger logger = LoggerFactory.getLogger(RedisStreamsService.class);
private final JedisPool jedisPool;
private final EventSerializationService serializationService;
@Value("${app.redis.streams.user-events:user:events}")
private String userEventsStream;
@Value("${app.redis.streams.order-events:order:events}")
private String orderEventsStream;
@Value("${app.redis.streams.payment-events:payment:events}")
private String paymentEventsStream;
@Value("${app.redis.streams.dlq-events:dlq:events}")
private String dlqStream;
@Value("${app.redis.consumer-group:event-processors}")
private String consumerGroup;
@Value("${app.redis.claim-timeout-ms:30000}")
private long claimTimeoutMs;
@Value("${app.redis.batch-size:100}")
private int batchSize;
public RedisStreamsService(JedisPool jedisPool, EventSerializationService serializationService) {
this.jedisPool = jedisPool;
this.serializationService = serializationService;
initializeConsumerGroups();
}
private void initializeConsumerGroups() {
try (Jedis jedis = jedisPool.getResource()) {
createConsumerGroupIfNotExists(jedis, userEventsStream);
createConsumerGroupIfNotExists(jedis, orderEventsStream);
createConsumerGroupIfNotExists(jedis, paymentEventsStream);
createConsumerGroupIfNotExists(jedis, dlqStream);
}
}
private void createConsumerGroupIfNotExists(Jedis jedis, String stream) {
try {
// Try to get group info to check if it exists
List<StreamGroupInfo> groups = jedis.xinfoGroups(stream);
boolean groupExists = groups.stream()
.anyMatch(group -> consumerGroup.equals(group.getName()));
if (!groupExists) {
jedis.xgroupCreate(stream, consumerGroup, new StreamEntryID(), true);
logger.info("Created consumer group '{}' for stream '{}'", consumerGroup, stream);
}
} catch (Exception e) {
// Group might not exist, create it
try {
jedis.xgroupCreate(stream, consumerGroup, new StreamEntryID(), true);
logger.info("Created consumer group '{}' for stream '{}'", consumerGroup, stream);
} catch (Exception ex) {
logger.warn("Consumer group '{}' might already exist for stream '{}'",
consumerGroup, stream);
}
}
}
// Event Publishing
public StreamEntryID publishEvent(String stream, DomainEvent event) {
try (Jedis jedis = jedisPool.getResource()) {
String eventJson = serializationService.serialize(event);
Map<String, String> fields = Map.of(
"eventType", event.getEventType(),
"aggregateId", event.getAggregateId(),
"aggregateType", event.getAggregateType(),
"eventData", eventJson,
"timestamp", event.getTimestamp().toString()
);
StreamEntryID entryId = jedis.xadd(stream, StreamEntryID.NEW_ENTRY, fields);
logger.debug("Published event {} to stream {} with ID {}",
event.getEventType(), stream, entryId);
return entryId;
} catch (Exception e) {
logger.error("Failed to publish event to stream {}: {}", stream, event.getEventType(), e);
throw new StreamOperationException("Failed to publish event", e);
}
}
public StreamEntryID publishUserEvent(DomainEvent event) {
return publishEvent(userEventsStream, event);
}
public StreamEntryID publishOrderEvent(DomainEvent event) {
return publishEvent(orderEventsStream, event);
}
public StreamEntryID publishPaymentEvent(DomainEvent event) {
return publishEvent(paymentEventsStream, event);
}
public StreamEntryID publishToDLQ(DomainEvent event, String originalStream,
StreamEntryID originalId, String error) {
try (Jedis jedis = jedisPool.getResource()) {
String eventJson = serializationService.serialize(event);
Map<String, String> fields = Map.of(
"eventType", event.getEventType(),
"aggregateId", event.getAggregateId(),
"originalStream", originalStream,
"originalId", originalId.toString(),
"error", error,
"eventData", eventJson,
"timestamp", event.getTimestamp().toString()
);
return jedis.xadd(dlqStream, StreamEntryID.NEW_ENTRY, fields);
} catch (Exception e) {
logger.error("Failed to publish event to DLQ: {}", event.getEventType(), e);
throw new StreamOperationException("Failed to publish to DLQ", e);
}
}
// Event Consumption
public List<StreamEvent> readEvents(String stream, String consumerName) {
try (Jedis jedis = jedisPool.getResource()) {
Map<String, StreamEntryID> streams = Map.of(stream, StreamEntryID.UNRECEIVED_ENTRY);
var params = XReadGroupParams.xReadGroupParams()
.block(5000) // 5 second block
.count(batchSize);
List<Map.Entry<String, List<StreamEntry>>> results =
jedis.xreadGroup(consumerGroup, consumerName, params, streams);
if (results.isEmpty()) {
return Collections.emptyList();
}
return results.get(0).getValue().stream()
.map(entry -> new StreamEvent(stream, entry.getID(), entry.getFields()))
.collect(Collectors.toList());
} catch (Exception e) {
logger.error("Failed to read events from stream {}: {}", stream, e.getMessage());
throw new StreamOperationException("Failed to read events", e);
}
}
public List<StreamEvent> readUserEvents(String consumerName) {
return readEvents(userEventsStream, consumerName);
}
public List<StreamEvent> readOrderEvents(String consumerName) {
return readEvents(orderEventsStream, consumerName);
}
public List<StreamEvent> readPaymentEvents(String consumerName) {
return readEvents(paymentEventsStream, consumerName);
}
// Acknowledgment
public void acknowledgeEvent(String stream, StreamEntryID entryId) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.xack(stream, consumerGroup, entryId);
logger.debug("Acknowledged event {} from stream {}", entryId, stream);
} catch (Exception e) {
logger.error("Failed to acknowledge event {} from stream {}", entryId, stream, e);
throw new StreamOperationException("Failed to acknowledge event", e);
}
}
// Claim stalled messages
public List<StreamEvent> claimStalledEvents(String stream, String consumerName) {
try (Jedis jedis = jedisPool.getResource()) {
// Get pending entries
List<Map.Entry<String, List<StreamEntry>>> pending = jedis.xpending(
stream, consumerGroup, null, null, batchSize, consumerName);
if (pending.isEmpty()) {
return Collections.emptyList();
}
// Claim entries that are idle for longer than timeout
List<StreamEntryID> entryIds = pending.stream()
.map(Map.Entry::getKey)
.map(StreamEntryID::new)
.collect(Collectors.toList());
var claimed = jedis.xclaim(
stream, consumerGroup, consumerName, claimTimeoutMs,
null, entryIds.toArray(new StreamEntryID[0]));
return claimed.stream()
.map(entry -> new StreamEvent(stream, entry.getID(), entry.getFields()))
.collect(Collectors.toList());
} catch (Exception e) {
logger.error("Failed to claim stalled events from stream {}: {}", stream, e.getMessage());
throw new StreamOperationException("Failed to claim stalled events", e);
}
}
// Stream information
public long getStreamLength(String stream) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.xlen(stream);
} catch (Exception e) {
logger.error("Failed to get stream length for {}: {}", stream, e.getMessage());
return -1;
}
}
public long getPendingCount(String stream) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.xpending(stream, consumerGroup).getTotalPending();
} catch (Exception e) {
logger.error("Failed to get pending count for stream {}: {}", stream, e.getMessage());
return -1;
}
}
public static class StreamEvent {
private final String stream;
private final StreamEntryID entryId;
private final Map<String, String> fields;
public StreamEvent(String stream, StreamEntryID entryId, Map<String, String> fields) {
this.stream = stream;
this.entryId = entryId;
this.fields = fields;
}
public String getStream() { return stream; }
public StreamEntryID getEntryId() { return entryId; }
public Map<String, String> getFields() { return fields; }
public String getEventType() { return fields.get("eventType"); }
public String getEventData() { return fields.get("eventData"); }
}
public static class StreamOperationException extends RuntimeException {
public StreamOperationException(String message) {
super(message);
}
public StreamOperationException(String message, Throwable cause) {
super(message, cause);
}
}
}
5. Event Consumers
Event Consumer Interface
package com.example.redisstreams.consumer;
import com.example.redisstreams.events.DomainEvent;
import com.example.redisstreams.service.RedisStreamsService;
public interface EventConsumer {
String getConsumerName();
String getStream();
void processEvent(DomainEvent event, RedisStreamsService.StreamEvent streamEvent);
Class<? extends DomainEvent> getEventType();
default boolean supports(DomainEvent event) {
return getEventType().isInstance(event);
}
}
User Events Consumer
package com.example.redisstreams.consumer;
import com.example.redisstreams.events.DomainEvent;
import com.example.redisstreams.events.UserEmailChangedEvent;
import com.example.redisstreams.events.UserRegisteredEvent;
import com.example.redisstreams.service.RedisStreamsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class UserEventsConsumer implements EventConsumer {
private static final Logger logger = LoggerFactory.getLogger(UserEventsConsumer.class);
private static final String CONSUMER_NAME = "user-events-processor";
private static final String STREAM = "user:events";
@Override
public String getConsumerName() {
return CONSUMER_NAME;
}
@Override
public String getStream() {
return STREAM;
}
@Override
public Class<? extends DomainEvent> getEventType() {
return DomainEvent.class; // Handle all user events
}
@Override
public void processEvent(DomainEvent event, RedisStreamsService.StreamEvent streamEvent) {
try {
if (event instanceof UserRegisteredEvent) {
handleUserRegistered((UserRegisteredEvent) event);
} else if (event instanceof UserEmailChangedEvent) {
handleUserEmailChanged((UserEmailChangedEvent) event);
} else {
logger.warn("Unhandled user event type: {}", event.getEventType());
}
logger.info("Processed user event: {} for user {}",
event.getEventType(), event.getAggregateId());
} catch (Exception e) {
logger.error("Failed to process user event: {}", event.getEventType(), e);
throw new EventProcessingException("Failed to process user event", e);
}
}
private void handleUserRegistered(UserRegisteredEvent event) {
// Business logic for user registration
logger.info("Processing user registration: {} ({})",
event.getUsername(), event.getEmail());
// Example: Update read model, send welcome email, etc.
// userReadModelService.update(event);
// emailService.sendWelcomeEmail(event.getEmail(), event.getFullName());
}
private void handleUserEmailChanged(UserEmailChangedEvent event) {
// Business logic for email change
logger.info("Processing email change: {} -> {} for user {}",
event.getOldEmail(), event.getNewEmail(), event.getAggregateId());
// Example: Update read model, send verification email, etc.
// userReadModelService.updateEmail(event.getAggregateId(), event.getNewEmail());
// emailService.sendEmailChangeNotification(event.getOldEmail(), event.getNewEmail());
}
public static class EventProcessingException extends RuntimeException {
public EventProcessingException(String message) {
super(message);
}
public EventProcessingException(String message, Throwable cause) {
super(message, cause);
}
}
}
Order Events Consumer
package com.example.redisstreams.consumer;
import com.example.redisstreams.events.*;
import com.example.redisstreams.service.RedisStreamsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class OrderEventsConsumer implements EventConsumer {
private static final Logger logger = LoggerFactory.getLogger(OrderEventsConsumer.class);
private static final String CONSUMER_NAME = "order-events-processor";
private static final String STREAM = "order:events";
@Override
public String getConsumerName() {
return CONSUMER_NAME;
}
@Override
public String getStream() {
return STREAM;
}
@Override
public Class<? extends DomainEvent> getEventType() {
return DomainEvent.class; // Handle all order events
}
@Override
public void processEvent(DomainEvent event, RedisStreamsService.StreamEvent streamEvent) {
try {
if (event instanceof OrderCreatedEvent) {
handleOrderCreated((OrderCreatedEvent) event);
} else if (event instanceof OrderItemAddedEvent) {
handleOrderItemAdded((OrderItemAddedEvent) event);
} else if (event instanceof OrderConfirmedEvent) {
handleOrderConfirmed((OrderConfirmedEvent) event);
} else {
logger.warn("Unhandled order event type: {}", event.getEventType());
}
logger.info("Processed order event: {} for order {}",
event.getEventType(), event.getAggregateId());
} catch (Exception e) {
logger.error("Failed to process order event: {}", event.getEventType(), e);
throw new EventProcessingException("Failed to process order event", e);
}
}
private void handleOrderCreated(OrderCreatedEvent event) {
logger.info("Processing order creation: {} for user {}",
event.getAggregateId(), event.getUserId());
// Update order read model
// orderReadModelService.createOrder(event);
// Trigger payment process
// paymentService.initiatePayment(event.getAggregateId(), event.getTotalAmount());
}
private void handleOrderItemAdded(OrderItemAddedEvent event) {
logger.info("Processing order item added: {} to order {}",
event.getProductId(), event.getAggregateId());
// Update order read model with new item
// orderReadModelService.addOrderItem(event.getAggregateId(), event);
}
private void handleOrderConfirmed(OrderConfirmedEvent event) {
logger.info("Processing order confirmation: {}", event.getAggregateId());
// Update order status in read model
// orderReadModelService.confirmOrder(event.getAggregateId());
// Notify inventory system
// inventoryService.reserveItemsForOrder(event.getAggregateId());
}
}
6. Event Consumer Manager
Event Consumer Manager
package com.example.redisstreams.manager;
import com.example.redisstreams.consumer.EventConsumer;
import com.example.redisstreams.events.DomainEvent;
import com.example.redisstreams.serialization.EventSerializationService;
import com.example.redisstreams.service.RedisStreamsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Component
public class EventConsumerManager {
private static final Logger logger = LoggerFactory.getLogger(EventConsumerManager.class);
private final List<EventConsumer> eventConsumers;
private final RedisStreamsService streamsService;
private final EventSerializationService serializationService;
private final ExecutorService executorService;
private final AtomicBoolean running;
public EventConsumerManager(List<EventConsumer> eventConsumers,
RedisStreamsService streamsService,
EventSerializationService serializationService) {
this.eventConsumers = eventConsumers;
this.streamsService = streamsService;
this.serializationService = serializationService;
this.executorService = Executors.newFixedThreadPool(eventConsumers.size());
this.running = new AtomicBoolean(false);
}
@PostConstruct
public void start() {
logger.info("Starting EventConsumerManager with {} consumers", eventConsumers.size());
running.set(true);
for (EventConsumer consumer : eventConsumers) {
executorService.submit(() -> processEventsForConsumer(consumer));
}
}
@PreDestroy
public void stop() {
logger.info("Stopping EventConsumerManager");
running.set(false);
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
private void processEventsForConsumer(EventConsumer consumer) {
logger.info("Starting event processing for consumer: {}", consumer.getConsumerName());
while (running.get()) {
try {
List<RedisStreamsService.StreamEvent> streamEvents =
streamsService.readEvents(consumer.getStream(), consumer.getConsumerName());
if (streamEvents.isEmpty()) {
// No events available, sleep briefly
Thread.sleep(100);
continue;
}
for (RedisStreamsService.StreamEvent streamEvent : streamEvents) {
processSingleEvent(consumer, streamEvent);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info("Event processing interrupted for consumer: {}", consumer.getConsumerName());
break;
} catch (Exception e) {
logger.error("Error processing events for consumer {}: {}",
consumer.getConsumerName(), e.getMessage(), e);
try {
Thread.sleep(1000); // Backoff on error
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
logger.info("Stopped event processing for consumer: {}", consumer.getConsumerName());
}
private void processSingleEvent(EventConsumer consumer, RedisStreamsService.StreamEvent streamEvent) {
try {
String eventData = streamEvent.getEventData();
DomainEvent event = serializationService.deserialize(eventData);
if (consumer.supports(event)) {
consumer.processEvent(event, streamEvent);
streamsService.acknowledgeEvent(streamEvent.getStream(), streamEvent.getEntryId());
logger.debug("Successfully processed and acknowledged event: {}", streamEvent.getEntryId());
} else {
logger.warn("Consumer {} does not support event type: {}",
consumer.getConsumerName(), event.getEventType());
streamsService.acknowledgeEvent(streamEvent.getStream(), streamEvent.getEntryId());
}
} catch (Exception e) {
logger.error("Failed to process event {} from stream {}: {}",
streamEvent.getEntryId(), streamEvent.getStream(), e.getMessage(), e);
// Move to Dead Letter Queue
handleFailedEvent(consumer, streamEvent, e);
}
}
private void handleFailedEvent(EventConsumer consumer, RedisStreamsService.StreamEvent streamEvent, Exception error) {
try {
DomainEvent event = serializationService.deserialize(streamEvent.getEventData());
streamsService.publishToDLQ(
event,
streamEvent.getStream(),
streamEvent.getEntryId(),
error.getMessage()
);
// Acknowledge the original event to prevent reprocessing
streamsService.acknowledgeEvent(streamEvent.getStream(), streamEvent.getEntryId());
logger.warn("Moved failed event {} to DLQ", streamEvent.getEntryId());
} catch (Exception dlqError) {
logger.error("Failed to move event {} to DLQ: {}", streamEvent.getEntryId(), dlqError.getMessage());
// Don't acknowledge, so it will be retried
}
}
@Scheduled(fixedDelay = 30000) // Every 30 seconds
public void processStalledEvents() {
if (!running.get()) {
return;
}
logger.debug("Checking for stalled events");
for (EventConsumer consumer : eventConsumers) {
try {
List<RedisStreamsService.StreamEvent> stalledEvents =
streamsService.claimStalledEvents(consumer.getStream(), consumer.getConsumerName());
if (!stalledEvents.isEmpty()) {
logger.info("Claimed {} stalled events for consumer {}",
stalledEvents.size(), consumer.getConsumerName());
for (RedisStreamsService.StreamEvent streamEvent : stalledEvents) {
processSingleEvent(consumer, streamEvent);
}
}
} catch (Exception e) {
logger.error("Error processing stalled events for consumer {}: {}",
consumer.getConsumerName(), e.getMessage());
}
}
}
@Scheduled(fixedRate = 60000) // Every minute
public void logConsumerStats() {
if (logger.isDebugEnabled()) {
for (EventConsumer consumer : eventConsumers) {
long streamLength = streamsService.getStreamLength(consumer.getStream());
long pendingCount = streamsService.getPendingCount(consumer.getStream());
logger.debug("Stream {}: length={}, pending={}",
consumer.getStream(), streamLength, pendingCount);
}
}
}
}
7. REST Controllers
Event Publishing Controller
package com.example.redisstreams.web;
import com.example.redisstreams.events.*;
import com.example.redisstreams.service.RedisStreamsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.UUID;
@RestController
@RequestMapping("/api/events")
public class EventController {
private static final Logger logger = LoggerFactory.getLogger(EventController.class);
private final RedisStreamsService streamsService;
public EventController(RedisStreamsService streamsService) {
this.streamsService = streamsService;
}
@PostMapping("/users/register")
public ResponseEntity<EventResponse> registerUser(@RequestBody UserRegistrationRequest request) {
String userId = "USER-" + UUID.randomUUID();
UserRegisteredEvent event = new UserRegisteredEvent(
userId, request.username(), request.email(), request.fullName()
);
var entryId = streamsService.publishUserEvent(event);
logger.info("Published user registration event: {} for user {}", entryId, userId);
return ResponseEntity.ok(new EventResponse(userId, entryId.toString(), "User registered successfully"));
}
@PutMapping("/users/{userId}/email")
public ResponseEntity<EventResponse> changeUserEmail(
@PathVariable String userId,
@RequestBody ChangeEmailRequest request) {
UserEmailChangedEvent event = new UserEmailChangedEvent(
userId, request.oldEmail(), request.newEmail()
);
var entryId = streamsService.publishUserEvent(event);
logger.info("Published email change event: {} for user {}", entryId, userId);
return ResponseEntity.ok(new EventResponse(userId, entryId.toString(), "Email change initiated"));
}
@PostMapping("/orders/create")
public ResponseEntity<EventResponse> createOrder(@RequestBody CreateOrderRequest request) {
String orderId = "ORDER-" + UUID.randomUUID();
OrderCreatedEvent event = new OrderCreatedEvent(
orderId, request.userId(), request.totalAmount(), request.currency()
);
var entryId = streamsService.publishOrderEvent(event);
logger.info("Published order creation event: {} for order {}", entryId, orderId);
return ResponseEntity.ok(new EventResponse(orderId, entryId.toString(), "Order created successfully"));
}
@PostMapping("/orders/{orderId}/items")
public ResponseEntity<EventResponse> addOrderItem(
@PathVariable String orderId,
@RequestBody AddOrderItemRequest request) {
OrderItemAddedEvent event = new OrderItemAddedEvent(
orderId, request.productId(), request.productName(),
request.quantity(), request.unitPrice()
);
var entryId = streamsService.publishOrderEvent(event);
logger.info("Published order item event: {} for order {}", entryId, orderId);
return ResponseEntity.ok(new EventResponse(orderId, entryId.toString(), "Item added to order"));
}
@PostMapping("/orders/{orderId}/confirm")
public ResponseEntity<EventResponse> confirmOrder(@PathVariable String orderId) {
OrderConfirmedEvent event = new OrderConfirmedEvent(orderId);
var entryId = streamsService.publishOrderEvent(event);
logger.info("Published order confirmation event: {} for order {}", entryId, orderId);
return ResponseEntity.ok(new EventResponse(orderId, entryId.toString(), "Order confirmed"));
}
@PostMapping("/payments/process")
public ResponseEntity<EventResponse> processPayment(@RequestBody ProcessPaymentRequest request) {
String paymentId = "PAYMENT-" + UUID.randomUUID();
PaymentProcessedEvent event = new PaymentProcessedEvent(
paymentId, request.orderId(), request.amount(),
request.paymentMethod(), request.success(), request.transactionId()
);
var entryId = streamsService.publishPaymentEvent(event);
logger.info("Published payment processed event: {} for payment {}", entryId, paymentId);
return ResponseEntity.ok(new EventResponse(paymentId, entryId.toString(), "Payment processed"));
}
// Request/Response DTOs
public record UserRegistrationRequest(String username, String email, String fullName) {}
public record ChangeEmailRequest(String oldEmail, String newEmail) {}
public record CreateOrderRequest(String userId, Double totalAmount, String currency) {}
public record AddOrderItemRequest(String productId, String productName, Integer quantity, Double unitPrice) {}
public record ProcessPaymentRequest(String orderId, Double amount, String paymentMethod,
Boolean success, String transactionId) {}
public record EventResponse(String entityId, String eventId, String message) {}
}
Monitoring Controller
package com.example.redisstreams.web;
import com.example.redisstreams.service.RedisStreamsService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
@RestController
@RequestMapping("/api/monitoring")
public class MonitoringController {
private final RedisStreamsService streamsService;
public MonitoringController(RedisStreamsService streamsService) {
this.streamsService = streamsService;
}
@GetMapping("/stats")
public Map<String, Object> getStreamStats() {
return Map.of(
"userEventsStream", Map.of(
"length", streamsService.getStreamLength("user:events"),
"pending", streamsService.getPendingCount("user:events")
),
"orderEventsStream", Map.of(
"length", streamsService.getStreamLength("order:events"),
"pending", streamsService.getPendingCount("order:events")
),
"paymentEventsStream", Map.of(
"length", streamsService.getStreamLength("payment:events"),
"pending", streamsService.getPendingCount("payment:events")
),
"dlqStream", Map.of(
"length", streamsService.getStreamLength("dlq:events"),
"pending", streamsService.getPendingCount("dlq:events")
)
);
}
}
8. Testing
Integration Tests with Testcontainers
package com.example.redisstreams.test;
import com.example.redisstreams.events.UserRegisteredEvent;
import com.example.redisstreams.service.RedisStreamsService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
@SpringBootTest
class RedisStreamsIntegrationTest {
@Container
static GenericContainer<?> redis = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
.withExposedPorts(6379);
@Autowired
private RedisStreamsService streamsService;
@BeforeEach
void setUp() {
// Testcontainers automatically sets spring.redis.host and spring.redis.port
System.setProperty("spring.redis.host", redis.getHost());
System.setProperty("spring.redis.port", redis.getFirstMappedPort().toString());
}
@Test
void testPublishAndReadEvent() {
// Given
String userId = "USER-" + UUID.randomUUID();
UserRegisteredEvent event = new UserRegisteredEvent(userId, "testuser", "[email protected]", "Test User");
// When
var entryId = streamsService.publishUserEvent(event);
// Then
assertThat(entryId).isNotNull();
// Read events (this would normally be done by the consumer)
var events = streamsService.readUserEvents("test-consumer");
assertThat(events).isNotEmpty();
}
}
Unit Tests
package com.example.redisstreams.test;
import com.example.redisstreams.consumer.UserEventsConsumer;
import com.example.redisstreams.events.UserRegisteredEvent;
import com.example.redisstreams.service.RedisStreamsService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
class UserEventsConsumerTest {
@Mock
private RedisStreamsService streamsService;
@Test
void testProcessUserRegisteredEvent() {
// Given
UserEventsConsumer consumer = new UserEventsConsumer();
UserRegisteredEvent event = new UserRegisteredEvent("USER-123", "john", "[email protected]", "John Doe");
RedisStreamsService.StreamEvent streamEvent =
new RedisStreamsService.StreamEvent("user:events", null, null);
// When
consumer.processEvent(event, streamEvent);
// Then - Verify no exceptions are thrown
// In real test, you would verify interactions with dependencies
}
}
9. Spring Boot Application
Main Application Class
package com.example.redisstreams;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class RedisStreamsApplication {
public static void main(String[] args) {
SpringApplication.run(RedisStreamsApplication.class, args);
}
}
10. Performance Optimization and Best Practices
Configuration for High Throughput
# application-prod.yml
spring:
redis:
host: ${REDIS_HOST:localhost}
port: ${REDIS_PORT:6379}
password: ${REDIS_PASSWORD:}
timeout: 1000ms
jedis:
pool:
max-active: 50
max-idle: 20
min-idle: 10
max-wait: 1000ms
app:
redis:
batch-size: 200
read-timeout-ms: 1000
claim-timeout-ms: 60000 # 1 minute for production
Monitoring and Metrics
package com.example.redisstreams.monitoring;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Component
public class EventMetrics {
private final MeterRegistry meterRegistry;
private final ConcurrentHashMap<String, Counter> eventCounters;
private final Timer eventProcessingTimer;
public EventMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.eventCounters = new ConcurrentHashMap<>();
this.eventProcessingTimer = Timer.builder("event.processing.time")
.description("Time taken to process events")
.register(meterRegistry);
}
public void recordEventProcessed(String eventType, boolean success, long durationMs) {
String status = success ? "success" : "failure";
// Event counter by type and status
Counter counter = eventCounters.computeIfAbsent(
eventType + "." + status,
key -> Counter.builder("event.processed")
.tag("type", eventType)
.tag("status", status)
.register(meterRegistry)
);
counter.increment();
// Processing time
eventProcessingTimer.record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordDlqEvent(String eventType) {
Counter.builder("event.dlq")
.tag("type", eventType)
.register(meterRegistry)
.increment();
}
}
Summary
This comprehensive Redis Streams implementation provides:
Key Features:
- Event Publishing: Reliable event publishing to Redis Streams
- Consumer Groups: Scalable event processing with multiple consumers
- Dead Letter Queue: Automatic handling of failed events
- Stalled Message Recovery: Automatic claiming of stalled messages
- Monitoring: Comprehensive monitoring and metrics
- Error Handling: Robust error handling and retry mechanisms
Benefits:
- Scalability: Horizontal scaling with consumer groups
- Durability: Persistent event storage with Redis
- Performance: High-throughput event processing
- Reliability: At-least-once delivery semantics
- Observability: Comprehensive monitoring and metrics
Use Cases:
- Event Sourcing and CQRS implementations
- Microservices communication
- Real-time data processing pipelines
- Audit logging and compliance
- Asynchronous workflow processing
This implementation is production-ready and can be extended with additional features like event schema evolution, advanced monitoring, and integration with other systems.