Real-Time Analytics with Kafka Streams in Java

Kafka Streams is a powerful client library for building real-time streaming applications and microservices that process data in Apache Kafka. It provides a simple and lightweight way to perform real-time analytics, ETL processes, and event-driven applications.

Core Concepts

Key Kafka Streams Concepts

  • Stream: An unbounded, continuously updating data set
  • Table: A changelog stream where each record represents an update
  • KStream: Abstraction for record streams
  • KTable: Abstraction for changelog streams
  • GlobalKTable: Fully replicated table across all application instances

Basic Setup and Dependencies

Maven Dependencies

<properties>
<kafka.version>3.4.0</kafka.version>
</properties>
<dependencies>
<!-- Kafka Streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- For JSON serialization -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.0</version>
</dependency>
<!-- For Avro serialization (optional) -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>7.3.0</version>
</dependency>
</dependencies>

Basic Data Models

Example 1: Event Data Models

// User activity events
public class UserClickEvent {
private String userId;
private String pageUrl;
private String productId;
private String action; // "view", "click", "purchase"
private Long timestamp;
private String sessionId;
private Map<String, String> metadata;
// constructors, getters, setters
public UserClickEvent() {}
public UserClickEvent(String userId, String pageUrl, String productId, 
String action, Long timestamp, String sessionId) {
this.userId = userId;
this.pageUrl = pageUrl;
this.productId = productId;
this.action = action;
this.timestamp = timestamp;
this.sessionId = sessionId;
this.metadata = new HashMap<>();
}
// getters and setters
}
// E-commerce events
public class ProductViewEvent {
private String productId;
private String productName;
private String category;
private Double price;
private String userId;
private Long timestamp;
private Integer viewDuration; // in seconds
// constructors, getters, setters
}
public class PurchaseEvent {
private String orderId;
private String userId;
private String productId;
private Integer quantity;
private Double totalAmount;
private Long timestamp;
private String paymentMethod;
// constructors, getters, setters
}
// Aggregated results
public class UserSessionSummary {
private String userId;
private String sessionId;
private Long startTime;
private Long endTime;
private Integer pageViewCount;
private Integer productClickCount;
private Integer purchaseCount;
private Double totalRevenue;
private List<String> viewedProducts;
// constructors, getters, setters
}
public class ProductAnalytics {
private String productId;
private String productName;
private Long viewCount;
private Long purchaseCount;
private Double conversionRate;
private Double totalRevenue;
private Long lastUpdated;
// constructors, getters, setters
}

Basic Stream Processing Examples

Example 2: Simple Stream Processing

@Slf4j
public class BasicStreamProcessor {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String APPLICATION_ID = "basic-stream-processor";
private final ObjectMapper objectMapper = new ObjectMapper();
public Properties getStreamsConfig() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Processing guarantees
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// Consumer settings
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
public void startSimpleStream() {
Properties config = getStreamsConfig();
StreamsBuilder builder = new StreamsBuilder();
// Read from input topic
KStream<String, String> userClicks = builder.stream("user-clicks");
// Filter and transform events
KStream<String, String> filteredClicks = userClicks
.filter((key, value) -> {
try {
UserClickEvent event = objectMapper.readValue(value, UserClickEvent.class);
return "purchase".equals(event.getAction());
} catch (Exception e) {
log.warn("Failed to parse event: {}", value, e);
return false;
}
})
.mapValues(value -> {
try {
UserClickEvent event = objectMapper.readValue(value, UserClickEvent.class);
// Enrich event
event.getMetadata().put("processed_at", String.valueOf(System.currentTimeMillis()));
return objectMapper.writeValueAsString(event);
} catch (Exception e) {
log.error("Failed to process event", e);
return value;
}
});
// Write to output topic
filteredClicks.to("purchase-events");
// Start the streams application
KafkaStreams streams = new KafkaStreams(builder.build(), config);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
// Start processing
streams.start();
log.info("Kafka Streams application started");
}
}

Example 3: Real-Time User Session Analytics

@Slf4j
public class UserSessionAnalytics {
private static final String APPLICATION_ID = "user-session-analytics";
public Properties getStreamsConfig() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// Optimize for low latency
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); // 10MB
return props;
}
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
ObjectMapper objectMapper = new ObjectMapper();
// Source: User click events
KStream<String, String> clickEvents = builder.stream("user-click-events");
// Parse JSON events
KStream<String, UserClickEvent> parsedClicks = clickEvents
.mapValues(value -> {
try {
return objectMapper.readValue(value, UserClickEvent.class);
} catch (Exception e) {
log.error("Failed to parse click event", e);
return null;
}
})
.filter((key, event) -> event != null);
// Group by session ID for session analytics
KGroupedStream<String, UserClickEvent> bySession = parsedClicks
.groupBy((key, event) -> event.getSessionId());
// Session window: 30 minutes of inactivity
SessionWindows sessionWindow = SessionWindows.ofInactivityGapWithNoGrace(
Duration.ofMinutes(30));
// Aggregate session data
KTable<Windowed<String>, UserSessionSummary> sessionSummaries = bySession
.aggregate(
UserSessionSummary::new,
(sessionId, event, summary) -> updateSessionSummary(summary, event),
(sessionId, agg1, agg2) -> mergeSessionSummaries(agg1, agg2),
Materialized.with(Serdes.String(), new JsonSerde<>(UserSessionSummary.class))
);
// Process session results
sessionSummaries
.toStream()
.map((windowedKey, summary) -> {
// Convert windowed key to regular key
String sessionId = windowedKey.key();
summary.setSessionId(sessionId);
summary.setEndTime(windowedKey.window().end());
return KeyValue.pair(sessionId, summary);
})
.mapValues(summary -> {
try {
return objectMapper.writeValueAsString(summary);
} catch (Exception e) {
log.error("Failed to serialize session summary", e);
return null;
}
})
.filter((key, value) -> value != null)
.to("user-session-summaries");
// Real-time metrics: Count events per user per minute
KTable<Windowed<String>, Long> userActivityCounts = parsedClicks
.groupBy((key, event) -> event.getUserId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.count(Materialized.as("user-activity-counts"));
userActivityCounts
.toStream()
.map((windowedKey, count) -> {
String userId = windowedKey.key();
long windowStart = windowedKey.window().start();
Map<String, Object> metric = Map.of(
"userId", userId,
"windowStart", windowStart,
"eventCount", count,
"windowSize", "1min"
);
return KeyValue.pair(userId, metric);
})
.mapValues(metric -> {
try {
return objectMapper.writeValueAsString(metric);
} catch (Exception e) {
return "{}";
}
})
.to("user-activity-metrics");
return builder.build();
}
private UserSessionSummary updateSessionSummary(UserSessionSummary summary, UserClickEvent event) {
if (summary.getUserId() == null) {
summary.setUserId(event.getUserId());
summary.setSessionId(event.getSessionId());
summary.setStartTime(event.getTimestamp());
summary.setViewedProducts(new ArrayList<>());
}
summary.setEndTime(event.getTimestamp());
switch (event.getAction()) {
case "view":
summary.setPageViewCount(summary.getPageViewCount() + 1);
if (event.getProductId() != null && 
!summary.getViewedProducts().contains(event.getProductId())) {
summary.getViewedProducts().add(event.getProductId());
}
break;
case "click":
summary.setProductClickCount(summary.getProductClickCount() + 1);
break;
case "purchase":
summary.setPurchaseCount(summary.getPurchaseCount() + 1);
// In real scenario, you'd get price from another source
summary.setTotalRevenue(summary.getTotalRevenue() + 99.99);
break;
}
return summary;
}
private UserSessionSummary mergeSessionSummaries(UserSessionSummary agg1, UserSessionSummary agg2) {
UserSessionSummary merged = new UserSessionSummary();
merged.setUserId(agg1.getUserId());
merged.setSessionId(agg1.getSessionId());
merged.setStartTime(Math.min(agg1.getStartTime(), agg2.getStartTime()));
merged.setEndTime(Math.max(agg1.getEndTime(), agg2.getEndTime()));
merged.setPageViewCount(agg1.getPageViewCount() + agg2.getPageViewCount());
merged.setProductClickCount(agg1.getProductClickCount() + agg2.getProductClickCount());
merged.setPurchaseCount(agg1.getPurchaseCount() + agg2.getPurchaseCount());
merged.setTotalRevenue(agg1.getTotalRevenue() + agg2.getTotalRevenue());
// Merge viewed products
Set<String> allProducts = new HashSet<>();
allProducts.addAll(agg1.getViewedProducts());
allProducts.addAll(agg2.getViewedProducts());
merged.setViewedProducts(new ArrayList<>(allProducts));
return merged;
}
public void start() {
Properties config = getStreamsConfig();
Topology topology = buildTopology();
KafkaStreams streams = new KafkaStreams(topology, config);
// Set exception handler
streams.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("Uncaught exception in stream thread {}", thread, throwable);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
});
// Add state listener for monitoring
streams.setStateListener((newState, oldState) -> {
log.info("Streams state changed from {} to {}", oldState, newState);
});
// Start the application
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Shutting down streams application");
streams.close(Duration.ofSeconds(30));
}));
}
}

Example 4: E-commerce Real-Time Analytics

@Slf4j
public class EcommerceAnalyticsProcessor {
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
ObjectMapper objectMapper = new ObjectMapper();
// Source streams
KStream<String, String> productViews = builder.stream("product-view-events");
KStream<String, String> purchases = builder.stream("purchase-events");
// Parse events
KStream<String, ProductViewEvent> parsedViews = productViews
.mapValues(value -> parseEvent(value, ProductViewEvent.class, objectMapper))
.filter((key, event) -> event != null);
KStream<String, PurchaseEvent> parsedPurchases = purchases
.mapValues(value -> parseEvent(value, PurchaseEvent.class, objectMapper))
.filter((key, event) -> event != null);
// Real-time product analytics
processProductAnalytics(builder, parsedViews, parsedPurchases, objectMapper);
// User behavior analytics
processUserBehaviorAnalytics(builder, parsedViews, parsedPurchases, objectMapper);
// Category-level analytics
processCategoryAnalytics(builder, parsedViews, parsedPurchases, objectMapper);
return builder.build();
}
private void processProductAnalytics(StreamsBuilder builder,
KStream<String, ProductViewEvent> views,
KStream<String, PurchaseEvent> purchases,
ObjectMapper objectMapper) {
// Product view counts (5-minute tumbling windows)
KTable<Windowed<String>, Long> productViewCounts = views
.groupBy((key, event) -> event.getProductId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("product-view-counts"));
// Product purchase counts
KTable<Windowed<String>, Long> productPurchaseCounts = purchases
.groupBy((key, event) -> event.getProductId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("product-purchase-counts"));
// Join views and purchases to calculate conversion rate
KTable<Windowed<String>, ProductAnalytics> productAnalytics = productViewCounts
.outerJoin(
productPurchaseCounts,
(viewCount, purchaseCount) -> {
ProductAnalytics analytics = new ProductAnalytics();
analytics.setViewCount(viewCount != null ? viewCount : 0L);
analytics.setPurchaseCount(purchaseCount != null ? purchaseCount : 0L);
analytics.setConversionRate(
analytics.getViewCount() > 0 ? 
(double) analytics.getPurchaseCount() / analytics.getViewCount() : 0.0
);
analytics.setLastUpdated(System.currentTimeMillis());
return analytics;
},
Materialized.with(Serdes.String(), new JsonSerde<>(ProductAnalytics.class))
);
// Output product analytics
productAnalytics
.toStream()
.map((windowedKey, analytics) -> {
analytics.setProductId(windowedKey.key());
return KeyValue.pair(windowedKey.key(), analytics);
})
.mapValues(analytics -> serialize(analytics, objectMapper))
.to("product-analytics");
}
private void processUserBehaviorAnalytics(StreamsBuilder builder,
KStream<String, ProductViewEvent> views,
KStream<String, PurchaseEvent> purchases,
ObjectMapper objectMapper) {
// User purchase frequency (hopping windows)
KTable<Windowed<String>, Long> userPurchaseCounts = purchases
.groupBy((key, event) -> event.getUserId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count(Materialized.as("user-purchase-counts"));
// User session value (total spent per session)
KTable<String, Double> userLifetimeValue = purchases
.groupBy((key, event) -> event.getUserId())
.aggregate(
() -> 0.0,
(userId, purchase, total) -> total + purchase.getTotalAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
// Output user metrics
userPurchaseCounts
.toStream()
.map((windowedKey, count) -> {
Map<String, Object> metric = Map.of(
"userId", windowedKey.key(),
"purchaseCount", count,
"windowStart", windowedKey.window().start(),
"metricType", "purchase_frequency"
);
return KeyValue.pair(windowedKey.key(), metric);
})
.mapValues(metric -> serialize(metric, objectMapper))
.to("user-behavior-metrics");
}
private void processCategoryAnalytics(StreamsBuilder builder,
KStream<String, ProductViewEvent> views,
KStream<String, PurchaseEvent> purchases,
ObjectMapper objectMapper) {
// Category-level view counts
KTable<Windowed<String>, Long> categoryViewCounts = views
.filter((key, event) -> event.getCategory() != null)
.groupBy((key, event) -> event.getCategory())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10)))
.count(Materialized.as("category-view-counts"));
// Category revenue
KTable<Windowed<String>, Double> categoryRevenue = purchases
.filter((key, event) -> event.getProductId() != null)
.groupBy((key, event) -> {
// In real implementation, you'd join with product catalog to get category
return "electronics"; // Simplified
})
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10)))
.aggregate(
() -> 0.0,
(category, purchase, total) -> total + purchase.getTotalAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
// Output category analytics
categoryViewCounts
.toStream()
.map((windowedKey, count) -> {
Map<String, Object> metric = Map.of(
"category", windowedKey.key(),
"viewCount", count,
"windowStart", windowedKey.window().start(),
"metricType", "category_views"
);
return KeyValue.pair(windowedKey.key(), metric);
})
.mapValues(metric -> serialize(metric, objectMapper))
.to("category-analytics");
}
private <T> T parseEvent(String value, Class<T> clazz, ObjectMapper objectMapper) {
try {
return objectMapper.readValue(value, clazz);
} catch (Exception e) {
log.warn("Failed to parse {} event: {}", clazz.getSimpleName(), value, e);
return null;
}
}
private String serialize(Object object, ObjectMapper objectMapper) {
try {
return objectMapper.writeValueAsString(object);
} catch (Exception e) {
log.error("Failed to serialize object", e);
return "{}";
}
}
}

Example 5: Advanced Stream Processing with Joins and State Stores

@Slf4j
public class AdvancedStreamProcessor {
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
ObjectMapper objectMapper = new ObjectMapper();
// Create state stores
StoreBuilder<KeyValueStore<String, UserProfile>> userProfileStore =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("user-profiles"),
Serdes.String(),
new JsonSerde<>(UserProfile.class)
);
builder.addStateStore(userProfileStore);
// Source streams
KStream<String, String> userEvents = builder.stream("user-events");
KStream<String, String> purchaseEvents = builder.stream("purchase-events");
KStream<String, String> userProfileUpdates = builder.stream("user-profile-updates");
// Process user profiles
KTable<String, UserProfile> userProfiles = userProfileUpdates
.mapValues(value -> parseEvent(value, UserProfile.class, objectMapper))
.filter((key, profile) -> profile != null)
.groupBy((key, profile) -> profile.getUserId())
.reduce(
(aggValue, newValue) -> newValue, // Always take the latest
Materialized.as("user-profiles")
);
// Enrich purchase events with user profiles
KStream<String, String> enrichedPurchases = purchaseEvents
.mapValues(value -> parseEvent(value, PurchaseEvent.class, objectMapper))
.filter((key, purchase) -> purchase != null)
.leftJoin(
userProfiles,
(purchase, profile) -> {
EnrichedPurchase enriched = new EnrichedPurchase(purchase);
if (profile != null) {
enriched.setUserTier(profile.getTier());
enriched.setUserRegion(profile.getRegion());
enriched.setLoyaltyPoints(profile.getLoyaltyPoints());
}
return enriched;
}
)
.mapValues(enriched -> serialize(enriched, objectMapper));
enrichedPurchases.to("enriched-purchase-events");
// Real-time fraud detection
processFraudDetection(builder, purchaseEvents, objectMapper);
// Real-time recommendation engine
processRecommendations(builder, userEvents, purchaseEvents, objectMapper);
return builder.build();
}
private void processFraudDetection(StreamsBuilder builder,
KStream<String, String> purchaseEvents,
ObjectMapper objectMapper) {
// Detect high-frequency purchases from same user
KTable<Windowed<String>, Long> userPurchaseFrequency = purchaseEvents
.mapValues(value -> parseEvent(value, PurchaseEvent.class, objectMapper))
.filter((key, purchase) -> purchase != null)
.groupBy((key, purchase) -> purchase.getUserId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("user-purchase-frequency"));
// Detect high-value purchases
KTable<Windowed<String>, Double> userPurchaseAmounts = purchaseEvents
.mapValues(value -> parseEvent(value, PurchaseEvent.class, objectMapper))
.filter((key, purchase) -> purchase != null)
.groupBy((key, purchase) -> purchase.getUserId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10)))
.aggregate(
() -> 0.0,
(userId, purchase, total) -> total + purchase.getTotalAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
// Generate fraud alerts
userPurchaseFrequency
.toStream()
.join(
userPurchaseAmounts.toStream(),
(frequency, amount) -> Map.of(
"frequency", frequency,
"amount", amount,
"riskScore", calculateRiskScore(frequency, amount)
)
)
.filter((windowedKey, riskData) -> {
Long frequency = (Long) riskData.get("frequency");
Double amount = (Double) riskData.get("double");
return frequency > 10 || amount > 1000.0; // Thresholds
})
.map((windowedKey, riskData) -> {
String userId = windowedKey.key();
FraudAlert alert = new FraudAlert(
userId,
"suspicious_activity",
riskData,
System.currentTimeMillis()
);
return KeyValue.pair(userId, alert);
})
.mapValues(alert -> serialize(alert, objectMapper))
.to("fraud-alerts");
}
private void processRecommendations(StreamsBuilder builder,
KStream<String, String> userEvents,
KStream<String, String> purchaseEvents,
ObjectMapper objectMapper) {
// Track recently viewed products
KTable<String, List<String>> userRecentViews = userEvents
.mapValues(value -> parseEvent(value, UserClickEvent.class, objectMapper))
.filter((key, event) -> event != null && "view".equals(event.getAction()))
.groupBy((key, event) -> event.getUserId())
.aggregate(
ArrayList::new,
(userId, event, recentViews) -> {
// Keep only last 10 viewed products
recentViews.add(0, event.getProductId());
if (recentViews.size() > 10) {
recentViews = new ArrayList<>(recentViews.subList(0, 10));
}
return recentViews;
},
Materialized.with(Serdes.String(), new JsonSerde<>(ArrayList.class))
);
// Generate real-time recommendations
userRecentViews
.toStream()
.mapValues(recentViews -> {
// Simplified recommendation logic
// In production, you'd use ML models or collaborative filtering
Map<String, Object> recommendations = Map.of(
"userId", "dummy", // Would be the key
"recommendedProducts", generateRecommendations(recentViews),
"timestamp", System.currentTimeMillis()
);
return recommendations;
})
.mapValues(recommendations -> serialize(recommendations, objectMapper))
.to("real-time-recommendations");
}
private double calculateRiskScore(Long frequency, Double amount) {
double freqScore = Math.min(frequency / 5.0, 10.0); // Normalize frequency
double amountScore = Math.min(amount / 500.0, 10.0); // Normalize amount
return (freqScore + amountScore) / 2.0;
}
private List<String> generateRecommendations(List<String> recentViews) {
// Simplified recommendation logic
// In real implementation, this would query a recommendation service
return List.of("prod-rec-1", "prod-rec-2", "prod-rec-3");
}
private <T> T parseEvent(String value, Class<T> clazz, ObjectMapper objectMapper) {
try {
return objectMapper.readValue(value, clazz);
} catch (Exception e) {
log.warn("Failed to parse event", e);
return null;
}
}
private String serialize(Object object, ObjectMapper objectMapper) {
try {
return objectMapper.writeValueAsString(object);
} catch (Exception e) {
log.error("Failed to serialize object", e);
return "{}";
}
}
}
// Supporting classes
class UserProfile {
private String userId;
private String tier; // "basic", "premium", "vip"
private String region;
private Integer loyaltyPoints;
private Long lastUpdated;
// constructors, getters, setters
}
class EnrichedPurchase {
private PurchaseEvent purchase;
private String userTier;
private String userRegion;
private Integer loyaltyPoints;
public EnrichedPurchase(PurchaseEvent purchase) {
this.purchase = purchase;
}
// getters, setters
}
class FraudAlert {
private String userId;
private String alertType;
private Map<String, Object> riskData;
private Long timestamp;
public FraudAlert(String userId, String alertType, Map<String, Object> riskData, Long timestamp) {
this.userId = userId;
this.alertType = alertType;
this.riskData = riskData;
this.timestamp = timestamp;
}
// getters, setters
}

Example 6: Monitoring and Management

@Slf4j
public class ManagedStreamsApplication {
private KafkaStreams streams;
private final CountDownLatch latch = new CountDownLatch(1);
public void start() {
Properties config = getStreamsConfig();
Topology topology = buildTopology();
streams = new KafkaStreams(topology, config);
// Configure monitoring and management
configureStreamsMonitoring();
// Start the application
streams.start();
// Wait for shutdown signal
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void configureStreamsMonitoring() {
// State listener
streams.setStateListener((newState, oldState) -> {
log.info("Streams state changed from {} to {}", oldState, newState);
if (newState == KafkaStreams.State.RUNNING) {
log.info("Streams application is now running");
} else if (newState == KafkaStreams.State.ERROR) {
log.error("Streams application entered ERROR state");
}
});
// Exception handler
streams.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("Uncaught exception in stream thread {}", thread, throwable);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
// Add metrics reporter
Metrics metrics = streams.metrics();
// JMX monitoring (automatically enabled)
log.info("JMX metrics available for monitoring");
// Custom health check endpoint
startHealthCheckServer();
}
private void startHealthCheckServer() {
// Simple HTTP health check server
new Thread(() -> {
try {
HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);
server.createContext("/health", exchange -> {
String response = getHealthStatus();
exchange.sendResponseHeaders(200, response.getBytes().length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(response.getBytes());
}
});
server.createContext("/metrics", exchange -> {
String response = getMetrics();
exchange.sendResponseHeaders(200, response.getBytes().length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(response.getBytes());
}
});
server.setExecutor(null);
server.start();
log.info("Health check server started on port 8080");
} catch (IOException e) {
log.error("Failed to start health check server", e);
}
}).start();
}
private String getHealthStatus() {
KafkaStreams.State state = streams.state();
boolean healthy = state == KafkaStreams.State.RUNNING || 
state == KafkaStreams.State.REBALANCING;
Map<String, Object> health = Map.of(
"status", healthy ? "HEALTHY" : "UNHEALTHY",
"state", state.toString(),
"threads", streams.metadataForLocalThreads().size(),
"timestamp", System.currentTimeMillis()
);
try {
return new ObjectMapper().writeValueAsString(health);
} catch (Exception e) {
return "{\"status\": \"ERROR\"}";
}
}
private String getMetrics() {
Map<String, Object> metrics = new HashMap<>();
streams.metrics().forEach((name, metric) -> {
if (metric.metricName().name().contains("rate") || 
metric.metricName().name().contains("count")) {
metrics.put(metric.metricName().name(), metric.metricValue());
}
});
try {
return new ObjectMapper().writeValueAsString(metrics);
} catch (Exception e) {
return "{}";
}
}
public void stop() {
if (streams != null) {
streams.close(Duration.ofSeconds(30));
latch.countDown();
}
}
private Properties getStreamsConfig() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "managed-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
return props;
}
private Topology buildTopology() {
// Your topology implementation
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic").to("output-topic");
return builder.build();
}
}

Best Practices for Kafka Streams

1. Configuration Optimization

public Properties getOptimizedConfig() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "optimized-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Processing guarantees
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// Performance tuning
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); // More frequent commits
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); // 10MB cache
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
// Consumer/producer settings
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1); // Low latency
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// State store settings
props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 60000L);
return props;
}

2. Error Handling and Resilience

public class ResilientStreamProcessor {
public KStream<String, String> buildResilientStream(StreamsBuilder builder) {
return builder.stream("input-topic")
.flatMapValues(value -> {
try {
// Attempt to process value
return List.of(processValue(value));
} catch (Exception e) {
log.error("Failed to process value, sending to DLQ", e);
// Send to dead letter queue
return List.of(); // Skip this record
}
})
.map((key, value) -> {
try {
return KeyValue.pair(key, transformValue(value));
} catch (Exception e) {
log.warn("Transform failed for key: {}", key, e);
return KeyValue.pair(key, value); // Pass through original
}
});
}
// Use intermediate topics for complex processing steps
public void buildModularTopology(StreamsBuilder builder) {
KStream<String, String> step1 = builder.stream("input-topic")
.filter((key, value) -> isValid(value))
.to("step1-output");
KStream<String, String> step2 = builder.stream("step1-output")
.mapValues(value -> transformStep2(value))
.to("step2-output");
KStream<String, String> step3 = builder.stream("step2-output")
.filter((key, value) -> !value.isEmpty())
.to("final-output");
}
}

Testing Kafka Streams Applications

Example 7: Unit Testing

@ExtendWith(MockitoExtension.class)
class KafkaStreamsTest {
@Test
void testStreamTopology() {
StreamsBuilder builder = new StreamsBuilder();
// Build your topology
builder.stream("input-topic")
.filter((key, value) -> value != null)
.mapValues(value -> value.toUpperCase())
.to("output-topic");
Topology topology = builder.build();
// Test the topology
try (TopologyTestDriver testDriver = new TopologyTestDriver(topology)) {
TestInputTopic<String, String> inputTopic = 
testDriver.createInputTopic("input-topic", 
Serdes.String().serializer(), 
Serdes.String().serializer());
TestOutputTopic<String, String> outputTopic = 
testDriver.createOutputTopic("output-topic",
Serdes.String().deserializer(),
Serdes.String().deserializer());
// Send test data
inputTopic.pipeInput("key1", "hello");
inputTopic.pipeInput("key2", "world");
// Verify results
assertThat(outputTopic.readKeyValue()).isEqualTo(new KeyValue<>("key1", "HELLO"));
assertThat(outputTopic.readKeyValue()).isEqualTo(new KeyValue<>("key2", "WORLD"));
assertThat(outputTopic.isEmpty()).isTrue();
}
}
}

Conclusion

Kafka Streams provides a powerful framework for building real-time analytics applications:

Key Benefits:

  • Exactly-once processing guarantees
  • Fault-tolerant stateful processing
  • Scalable and distributed processing
  • No external dependencies beyond Kafka

Common Use Cases:

  • Real-time user behavior analytics
  • Fraud detection and anomaly detection
  • Real-time recommendations
  • IoT data processing
  • E-commerce analytics

Best Practices:

  • Use appropriate windowing strategies for your use case
  • Implement proper error handling and dead letter queues
  • Monitor streams application health and performance
  • Test thoroughly with TopologyTestDriver
  • Optimize configuration for your latency and throughput requirements

Kafka Streams enables you to build sophisticated real-time analytics pipelines that can process massive volumes of data with low latency and high reliability.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper