Kafka Streams for Data Processing in Java

Overview

Kafka Streams is a client library for building applications and microservices that process data stored in Apache Kafka. It provides a simple and lightweight way to perform stateful and stateless processing on real-time data streams.

Key Concepts

  • Stream: An unbounded, continuously updating data set
  • Table: A changelog stream where each record represents an update
  • Topology: A graph of stream processors (nodes) and their connections (edges)
  • KStream: Abstraction for record streams
  • KTable: Abstraction for changelog streams
  • GlobalKTable: Replicated table across all application instances

Setup and Dependencies

Maven Dependencies

<properties>
<kafka.version>3.4.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>

Basic Configuration

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
public class KafkaStreamsConfig {
public static Properties getStreamsConfig(String applicationId) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Processing guarantees
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// State directory
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
return props;
}
}

Basic Stream Processing Examples

Example 1: Simple Stream Processing

public class SimpleStreamProcessor {
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("simple-stream-processor");
StreamsBuilder builder = new StreamsBuilder();
// Read from input topic
KStream<String, String> sourceStream = builder.stream("input-topic");
// Process records - convert values to uppercase
KStream<String, String> transformedStream = sourceStream
.peek((key, value) -> System.out.println("Processing: key=" + key + ", value=" + value))
.mapValues(value -> value.toUpperCase())
.filter((key, value) -> value.length() > 5);
// Write to output topic
transformedStream.to("output-topic");
// Start the streams application
KafkaStreams streams = new KafkaStreams(builder.build(), config);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
}
}

Example 2: Word Count Application

public class WordCountStreamsApp {
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("word-count-app");
StreamsBuilder builder = new StreamsBuilder();
// Read from text lines topic
KStream<String, String> textLines = builder.stream("text-lines-topic");
// Word count logic
KTable<String, Long> wordCounts = textLines
// Split each text line into words
.flatMapValues(textLine -> 
Arrays.asList(textLine.toLowerCase().split("\\W+")))
// Group by word
.groupBy((key, word) -> word)
// Count occurrences
.count(Materialized.as("word-counts-store"));
// Write to output topic
wordCounts
.toStream()
.to("word-counts-topic", 
Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
// Add state listener for monitoring
streams.setStateListener((newState, oldState) -> {
System.out.println("State changed from " + oldState + " to " + newState);
});
streams.start();
}
}

Stateful Processing

Example 3: Customer Session Analysis

public class CustomerSessionAnalysis {
// Custom value classes
public static class CustomerEvent {
private String customerId;
private String eventType; // "VIEW", "PURCHASE", "CLICK"
private String productId;
private Double amount;
private Long timestamp;
// Constructors, getters, setters
public CustomerEvent() {}
public CustomerEvent(String customerId, String eventType, String productId, Double amount, Long timestamp) {
this.customerId = customerId;
this.eventType = eventType;
this.productId = productId;
this.amount = amount;
this.timestamp = timestamp;
}
// Getters and setters...
}
public static class CustomerSession {
private String customerId;
private Long startTime;
private Long endTime;
private Integer eventCount = 0;
private Double totalAmount = 0.0;
private Set<String> viewedProducts = new HashSet<>();
private Set<String> purchasedProducts = new HashSet<>();
// Methods to update session
public void addEvent(CustomerEvent event) {
this.eventCount++;
if ("PURCHASE".equals(event.getEventType()) && event.getAmount() != null) {
this.totalAmount += event.getAmount();
this.purchasedProducts.add(event.getProductId());
} else if ("VIEW".equals(event.getEventType())) {
this.viewedProducts.add(event.getProductId());
}
if (this.startTime == null || event.getTimestamp() < this.startTime) {
this.startTime = event.getTimestamp();
}
if (this.endTime == null || event.getTimestamp() > this.endTime) {
this.endTime = event.getTimestamp();
}
}
// Getters and setters...
}
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("customer-session-analysis");
// Configure custom serdes
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
StreamsBuilder builder = new StreamsBuilder();
// Create custom serde for CustomerEvent
final JsonSerde<CustomerEvent> customerEventSerde = new JsonSerde<>(CustomerEvent.class);
KStream<String, CustomerEvent> customerEvents = builder.stream(
"customer-events-topic",
Consumed.with(Serdes.String(), customerEventSerde)
);
// Session window: 30 minutes of inactivity
SessionWindows sessionWindow = SessionWindows.ofInactivityGapWithNoGrace(
Duration.ofMinutes(30));
// Group events by customer and session
KTable<Windowed<String>, CustomerSession> customerSessions = customerEvents
.groupByKey()
.windowedBy(sessionWindow)
.aggregate(
CustomerSession::new,
(customerId, event, session) -> {
session.setCustomerId(customerId);
session.addEvent(event);
return session;
},
(aggKey, aggOne, aggTwo) -> {
// Merge sessions if windows merge
CustomerSession merged = new CustomerSession();
merged.setCustomerId(aggOne.getCustomerId());
merged.setStartTime(Math.min(aggOne.getStartTime(), aggTwo.getStartTime()));
merged.setEndTime(Math.max(aggOne.getEndTime(), aggTwo.getEndTime()));
merged.setEventCount(aggOne.getEventCount() + aggTwo.getEventCount());
merged.setTotalAmount(aggOne.getTotalAmount() + aggTwo.getTotalAmount());
merged.getViewedProducts().addAll(aggOne.getViewedProducts());
merged.getViewedProducts().addAll(aggTwo.getViewedProducts());
merged.getPurchasedProducts().addAll(aggOne.getPurchasedProducts());
merged.getPurchasedProducts().addAll(aggTwo.getPurchasedProducts());
return merged;
},
Materialized.<String, CustomerSession, SessionStore<Bytes, byte[]>>as("customer-sessions-store")
.withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde<>(CustomerSession.class))
);
// Process completed sessions
customerSessions
.toStream()
.filter((windowedKey, session) -> 
session != null && session.getEventCount() >= 3) // Only sessions with 3+ events
.map((windowedKey, session) -> {
// Calculate session duration in minutes
long durationMinutes = (session.getEndTime() - session.getStartTime()) / (60 * 1000);
session.setDurationMinutes(durationMinutes);
return KeyValue.pair(windowedKey.key(), session);
})
.to("customer-sessions-output", 
Produced.with(Serdes.String(), new JsonSerde<>(CustomerSession.class)));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}

Example 4: Real-time Fraud Detection

public class FraudDetectionStreamsApp {
public static class Transaction {
private String transactionId;
private String customerId;
private String merchantId;
private Double amount;
private String location;
private Long timestamp;
private String paymentMethod;
// Constructors, getters, setters
}
public static class FraudScore {
private String transactionId;
private Double score;
private String reason;
private Boolean isFraudulent;
// Constructors, getters, setters
}
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("fraud-detection-app");
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
StreamsBuilder builder = new StreamsBuilder();
final JsonSerde<Transaction> transactionSerde = new JsonSerde<>(Transaction.class);
final JsonSerde<FraudScore> fraudScoreSerde = new JsonSerde<>(FraudScore.class);
KStream<String, Transaction> transactions = builder.stream(
"transactions-topic",
Consumed.with(Serdes.String(), transactionSerde)
);
// Create KTable for customer profiles (e.g., average transaction amount)
KTable<String, Double> customerAvgAmount = transactions
.groupBy((key, transaction) -> transaction.getCustomerId())
.aggregate(
() -> 0.0,
(customerId, transaction, currentAvg) -> {
// Simple moving average (for demo purposes)
return (currentAvg * 0.9) + (transaction.getAmount() * 0.1);
},
Materialized.as("customer-avg-amount-store")
);
// Join stream with table for real-time fraud detection
KStream<String, FraudScore> fraudScores = transactions
.leftJoin(customerAvgAmount,
(transaction, avgAmount) -> {
FraudScore score = new FraudScore();
score.setTransactionId(transaction.getTransactionId());
// Simple fraud detection rules
double fraudScore = 0.0;
List<String> reasons = new ArrayList<>();
// Rule 1: Amount significantly higher than average
if (avgAmount != null && transaction.getAmount() > avgAmount * 3) {
fraudScore += 0.6;
reasons.add("Amount significantly above average");
}
// Rule 2: Very large absolute amount
if (transaction.getAmount() > 10000.0) {
fraudScore += 0.3;
reasons.add("Very large transaction amount");
}
score.setScore(fraudScore);
score.setReason(String.join("; ", reasons));
score.setIsFraudulent(fraudScore > 0.7);
return score;
},
Joined.with(Serdes.String(), transactionSerde, Serdes.Double())
);
// Split stream into fraudulent and legitimate transactions
Map<String, KStream<String, FraudScore>> branches = fraudScores
.split(Named.as("fraud-"))
.branch((key, score) -> score.getIsFraudulent(), Branched.as("fraudulent"))
.branch((key, score) -> !score.getIsFraudulent(), Branched.as("legitimate"))
.noDefaultBranch();
// Send fraudulent transactions to alert topic
branches.get("fraud-fraudulent")
.to("fraud-alerts-topic", Produced.with(Serdes.String(), fraudScoreSerde));
// Send all transactions with scores to monitoring topic
fraudScores.to("transaction-scores-topic", 
Produced.with(Serdes.String(), fraudScoreSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}

Advanced Stream Processing

Example 5: Stream-Table Join with Enrichment

public class StreamTableJoinEnrichment {
public static class Order {
private String orderId;
private String customerId;
private String productId;
private Integer quantity;
private Long timestamp;
// Constructors, getters, setters
}
public static class Customer {
private String customerId;
private String name;
private String tier; // "BRONZE", "SILVER", "GOLD"
private String region;
// Constructors, getters, setters
}
public static class EnrichedOrder {
private Order order;
private Customer customer;
private Double discount;
// Constructors, getters, setters
}
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("stream-table-join");
StreamsBuilder builder = new StreamsBuilder();
final JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
final JsonSerde<Customer> customerSerde = new JsonSerde<>(Customer.class);
final JsonSerde<EnrichedOrder> enrichedOrderSerde = new JsonSerde<>(EnrichedOrder.class);
// Stream of orders
KStream<String, Order> orders = builder.stream(
"orders-topic",
Consumed.with(Serdes.String(), orderSerde)
);
// Table of customers (from compacted topic)
KTable<String, Customer> customers = builder.table(
"customers-topic",
Consumed.with(Serdes.String(), customerSerde),
Materialized.as("customers-store")
);
// GlobalKTable for product information (small, read-only dataset)
GlobalKTable<String, String> products = builder.globalTable(
"products-topic",
Consumed.with(Serdes.String(), Serdes.String())
);
// Join order stream with customer table
KStream<String, EnrichedOrder> enrichedOrders = orders
.selectKey((key, order) -> order.getCustomerId()) // Re-key by customerId for join
.join(customers,
(order, customer) -> {
EnrichedOrder enriched = new EnrichedOrder();
enriched.setOrder(order);
enriched.setCustomer(customer);
// Apply tier-based discount
switch (customer.getTier()) {
case "GOLD":
enriched.setDiscount(0.15);
break;
case "SILVER":
enriched.setDiscount(0.10);
break;
case "BRONZE":
enriched.setDiscount(0.05);
break;
default:
enriched.setDiscount(0.0);
}
return enriched;
},
Joined.with(Serdes.String(), orderSerde, customerSerde)
)
.selectKey((key, enriched) -> enriched.getOrder().getOrderId()); // Back to orderId as key
// Further enrichment with GlobalKTable (no re-keying needed)
KStream<String, EnrichedOrder> fullyEnrichedOrders = enrichedOrders
.join(products,
(key, enriched) -> enriched.getOrder().getProductId(), // Key for GlobalKTable lookup
(enriched, productName) -> {
// Add product name to enriched order
// In real scenario, you might have a Product object
return enriched; // Simplified
}
);
// Process enriched orders
fullyEnrichedOrders
.peek((key, enriched) -> 
System.out.println("Enriched order: " + key + ", Customer: " + 
enriched.getCustomer().getName() + ", Discount: " + enriched.getDiscount()))
.to("enriched-orders-topic", 
Produced.with(Serdes.String(), enrichedOrderSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}

Example 6: Windowed Aggregations with Suppression

public class WindowedAggregationsWithSuppression {
public static class SalesEvent {
private String productId;
private String category;
private Double amount;
private Integer quantity;
private Long timestamp;
// Constructors, getters, setters
}
public static class SalesSummary {
private Long windowStart;
private Long windowEnd;
private Integer totalQuantity;
private Double totalAmount;
private Long eventCount;
// Constructors, getters, setters
}
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("windowed-aggregations");
StreamsBuilder builder = new StreamsBuilder();
final JsonSerde<SalesEvent> salesEventSerde = new JsonSerde<>(SalesEvent.class);
final JsonSerde<SalesSummary> salesSummarySerde = new JsonSerde<>(SalesSummary.class);
KStream<String, SalesEvent> salesEvents = builder.stream(
"sales-events-topic",
Consumed.with(Serdes.String(), salesEventSerde)
);
// Tumbling window: 1 hour
TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1));
// Hopping window: 1 hour, advancing every 15 minutes
HoppingWindows hoppingWindow = HoppingWindows.ofTimeDifferenceWithNoGrace(
Duration.ofHours(1), Duration.ofMinutes(15));
// Sliding window: 1 hour
SlidingWindows slidingWindow = SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1));
// Tumbling window aggregation by product
KTable<Windowed<String>, SalesSummary> productSalesHourly = salesEvents
.groupBy((key, event) -> event.getProductId())
.windowedBy(tumblingWindow)
.aggregate(
SalesSummary::new,
(productId, event, summary) -> {
summary.setTotalQuantity(
(summary.getTotalQuantity() == null ? 0 : summary.getTotalQuantity()) + 
event.getQuantity());
summary.setTotalAmount(
(summary.getTotalAmount() == null ? 0.0 : summary.getTotalAmount()) + 
event.getAmount());
summary.setEventCount(
(summary.getEventCount() == null ? 0L : summary.getEventCount()) + 1);
return summary;
},
Materialized.<String, SalesSummary, WindowStore<Bytes, byte[]>>as("product-sales-hourly-store")
.withKeySerde(Serdes.String())
.withValueSerde(salesSummarySerde)
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
// Category-level aggregation with hopping window
KTable<Windowed<String>, SalesSummary> categorySales = salesEvents
.groupBy((key, event) -> event.getCategory())
.windowedBy(hoppingWindow)
.aggregate(
SalesSummary::new,
(category, event, summary) -> {
summary.setTotalQuantity(
(summary.getTotalQuantity() == null ? 0 : summary.getTotalQuantity()) + 
event.getQuantity());
summary.setTotalAmount(
(summary.getTotalAmount() == null ? 0.0 : summary.getTotalAmount()) + 
event.getAmount());
return summary;
}
);
// Output results
productSalesHourly
.toStream()
.map((windowedKey, summary) -> {
// Add window timing information
summary.setWindowStart(windowedKey.window().start());
summary.setWindowEnd(windowedKey.window().end());
return KeyValue.pair(windowedKey.key(), summary);
})
.to("product-sales-hourly-output", 
Produced.with(Serdes.String(), salesSummarySerde));
categorySales
.toStream()
.map((windowedKey, summary) -> {
summary.setWindowStart(windowedKey.window().start());
summary.setWindowEnd(windowedKey.window().end());
return KeyValue.pair(windowedKey.key(), summary);
})
.to("category-sales-output", 
Produced.with(Serdes.String(), salesSummarySerde));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}

Custom Processing with Processor API

Example 7: Custom Stateful Processor

public class CustomProcessorAPIExample {
public static class AlertProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
private KeyValueStore<String, Long> alertCountStore;
private final Long alertThreshold = 10L;
private final Duration windowSize = Duration.ofMinutes(5);
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
this.alertCountStore = context.getStateStore("alert-count-store");
// Schedule punctuation to clean up old entries
this.context.schedule(
windowSize, 
PunctuationType.WALL_CLOCK_TIME, 
this::cleanupOldEntries);
}
@Override
public void process(Record<String, String> record) {
String key = record.key();
Long currentTime = record.timestamp();
// Get current count or initialize
Long currentCount = alertCountStore.get(key);
if (currentCount == null) {
currentCount = 0L;
}
// Increment count
currentCount++;
alertCountStore.put(key, currentCount);
// Check threshold
if (currentCount >= alertThreshold) {
// Send alert
String alertMessage = String.format(
"Alert: Key '%s' has triggered %d times in the last %s",
key, currentCount, windowSize);
context.forward(new Record<>(key, alertMessage, currentTime));
// Reset count after alert
alertCountStore.put(key, 0L);
}
}
private void cleanupOldEntries(long timestamp) {
try (KeyValueIterator<String, Long> iterator = alertCountStore.all()) {
while (iterator.hasNext()) {
KeyValue<String, Long> entry = iterator.next();
// In a real scenario, you might track timestamps for each key
// and remove entries that haven't been updated in the window size
}
}
}
@Override
public void close() {
// Cleanup resources
}
}
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("custom-processor-example");
StreamsBuilder builder = new StreamsBuilder();
// Add state store
StoreBuilder<KeyValueStore<String, Long>> alertCountStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("alert-count-store"),
Serdes.String(),
Serdes.Long()
);
builder.addStateStore(alertCountStore);
// Build topology with custom processor
Topology topology = builder.build();
topology.addSource("Source", "input-topic")
.addProcessor("AlertProcessor", 
() -> new AlertProcessor(), "Source")
.addStateStore(alertCountStore, "AlertProcessor")
.addSink("Sink", "alert-output-topic", "AlertProcessor");
KafkaStreams streams = new KafkaStreams(topology, config);
// Add uncaught exception handler
streams.setUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("Uncaught exception in thread " + thread + ": " + throwable);
// Return to stop the application, or continue to keep it running
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
});
streams.start();
}
}

Testing Kafka Streams Applications

Example 8: Unit Testing with TopologyTestDriver

public class KafkaStreamsTest {
@Test
public void testWordCountTopology() {
// Setup
Properties config = KafkaStreamsConfig.getStreamsConfig("test-word-count");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("input-topic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count();
wordCounts.toStream().to("output-topic");
Topology topology = builder.build();
try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
// Test inputs
TestInputTopic<String, String> inputTopic = 
testDriver.createInputTopic("input-topic", 
Serdes.String().serializer(), 
Serdes.String().serializer());
TestOutputTopic<String, Long> outputTopic = 
testDriver.createOutputTopic("output-topic", 
Serdes.String().deserializer(), 
Serdes.Long().deserializer());
// Send test data
inputTopic.pipeInput("hello world hello kafka");
inputTopic.pipeInput("kafka streams are great");
// Verify results
List<KeyValue<String, Long>> results = outputTopic.readKeyValuesToList();
Assertions.assertEquals(5, results.size());
// Verify specific word counts
Map<String, Long> wordCountMap = new HashMap<>();
for (KeyValue<String, Long> result : results) {
wordCountMap.merge(result.key, result.value, Long::sum);
}
Assertions.assertEquals(2L, (long) wordCountMap.get("hello"));
Assertions.assertEquals(2L, (long) wordCountMap.get("kafka"));
Assertions.assertEquals(1L, (long) wordCountMap.get("streams"));
}
}
}

Monitoring and Operations

Example 9: Metrics and Monitoring

public class MonitoringStreamsApp {
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("monitoring-app");
// Enable metrics
config.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
StreamsBuilder builder = new StreamsBuilder();
// Your stream processing logic here
builder.stream("input-topic")
.process(() -> new MonitoringProcessor())
.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), config);
// Add state listener
streams.setStateListener((newState, oldState) -> {
System.out.printf("State transition: %s -> %s%n", oldState, newState);
});
// Add metrics listener
streams.setMetricsListener(new MetricsListener() {
@Override
public void metrics(Map<String, Object> metrics) {
// Monitor specific metrics
Double processRate = (Double) metrics.get("process-rate");
Double pollRate = (Double) metrics.get("poll-rate");
if (processRate != null && processRate < 10.0) {
System.out.println("Warning: Low processing rate: " + processRate);
}
}
});
// Add exception handler
streams.setUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("Uncaught exception in thread " + thread);
throwable.printStackTrace();
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
streams.start();
}
public static class MonitoringProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
private long processedCount = 0;
private long lastLogTime = System.currentTimeMillis();
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
}
@Override
public void process(Record<String, String> record) {
processedCount++;
// Log progress every 1000 records or 30 seconds
long currentTime = System.currentTimeMillis();
if (processedCount % 1000 == 0 || currentTime - lastLogTime > 30000) {
System.out.printf("Processed %d records. Last key: %s%n", 
processedCount, record.key());
lastLogTime = currentTime;
}
// Forward record
context.forward(record);
}
@Override
public void close() {
System.out.println("Processor closed. Total processed: " + processedCount);
}
}
}

Best Practices

  1. Use exactly-once processing for critical applications
  2. Choose appropriate window types based on use case requirements
  3. Monitor state store sizes and implement cleanup strategies
  4. Use suppression to control output rate for windowed operations
  5. Implement proper error handling and dead letter queues
  6. Test thoroughly with TopologyTestDriver
  7. Monitor metrics for performance and troubleshooting
  8. Use appropriate serdes for your data types
  9. Consider partitioning strategy for optimal performance
  10. Plan for state store cleanup and maintenance

Kafka Streams provides a powerful yet simple way to build real-time data processing applications that can scale with your data needs while providing strong guarantees about processing semantics.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper