Overview
Kafka Streams is a client library for building applications and microservices that process data stored in Apache Kafka. It provides a simple and lightweight way to perform stateful and stateless processing on real-time data streams.
Key Concepts
- Stream: An unbounded, continuously updating data set
- Table: A changelog stream where each record represents an update
- Topology: A graph of stream processors (nodes) and their connections (edges)
- KStream: Abstraction for record streams
- KTable: Abstraction for changelog streams
- GlobalKTable: Replicated table across all application instances
Setup and Dependencies
Maven Dependencies
<properties>
<kafka.version>3.4.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
Basic Configuration
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
public class KafkaStreamsConfig {
public static Properties getStreamsConfig(String applicationId) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Processing guarantees
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// State directory
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
return props;
}
}
Basic Stream Processing Examples
Example 1: Simple Stream Processing
public class SimpleStreamProcessor {
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("simple-stream-processor");
StreamsBuilder builder = new StreamsBuilder();
// Read from input topic
KStream<String, String> sourceStream = builder.stream("input-topic");
// Process records - convert values to uppercase
KStream<String, String> transformedStream = sourceStream
.peek((key, value) -> System.out.println("Processing: key=" + key + ", value=" + value))
.mapValues(value -> value.toUpperCase())
.filter((key, value) -> value.length() > 5);
// Write to output topic
transformedStream.to("output-topic");
// Start the streams application
KafkaStreams streams = new KafkaStreams(builder.build(), config);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
}
}
Example 2: Word Count Application
public class WordCountStreamsApp {
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("word-count-app");
StreamsBuilder builder = new StreamsBuilder();
// Read from text lines topic
KStream<String, String> textLines = builder.stream("text-lines-topic");
// Word count logic
KTable<String, Long> wordCounts = textLines
// Split each text line into words
.flatMapValues(textLine ->
Arrays.asList(textLine.toLowerCase().split("\\W+")))
// Group by word
.groupBy((key, word) -> word)
// Count occurrences
.count(Materialized.as("word-counts-store"));
// Write to output topic
wordCounts
.toStream()
.to("word-counts-topic",
Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
// Add state listener for monitoring
streams.setStateListener((newState, oldState) -> {
System.out.println("State changed from " + oldState + " to " + newState);
});
streams.start();
}
}
Stateful Processing
Example 3: Customer Session Analysis
public class CustomerSessionAnalysis {
// Custom value classes
public static class CustomerEvent {
private String customerId;
private String eventType; // "VIEW", "PURCHASE", "CLICK"
private String productId;
private Double amount;
private Long timestamp;
// Constructors, getters, setters
public CustomerEvent() {}
public CustomerEvent(String customerId, String eventType, String productId, Double amount, Long timestamp) {
this.customerId = customerId;
this.eventType = eventType;
this.productId = productId;
this.amount = amount;
this.timestamp = timestamp;
}
// Getters and setters...
}
public static class CustomerSession {
private String customerId;
private Long startTime;
private Long endTime;
private Integer eventCount = 0;
private Double totalAmount = 0.0;
private Set<String> viewedProducts = new HashSet<>();
private Set<String> purchasedProducts = new HashSet<>();
// Methods to update session
public void addEvent(CustomerEvent event) {
this.eventCount++;
if ("PURCHASE".equals(event.getEventType()) && event.getAmount() != null) {
this.totalAmount += event.getAmount();
this.purchasedProducts.add(event.getProductId());
} else if ("VIEW".equals(event.getEventType())) {
this.viewedProducts.add(event.getProductId());
}
if (this.startTime == null || event.getTimestamp() < this.startTime) {
this.startTime = event.getTimestamp();
}
if (this.endTime == null || event.getTimestamp() > this.endTime) {
this.endTime = event.getTimestamp();
}
}
// Getters and setters...
}
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("customer-session-analysis");
// Configure custom serdes
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
StreamsBuilder builder = new StreamsBuilder();
// Create custom serde for CustomerEvent
final JsonSerde<CustomerEvent> customerEventSerde = new JsonSerde<>(CustomerEvent.class);
KStream<String, CustomerEvent> customerEvents = builder.stream(
"customer-events-topic",
Consumed.with(Serdes.String(), customerEventSerde)
);
// Session window: 30 minutes of inactivity
SessionWindows sessionWindow = SessionWindows.ofInactivityGapWithNoGrace(
Duration.ofMinutes(30));
// Group events by customer and session
KTable<Windowed<String>, CustomerSession> customerSessions = customerEvents
.groupByKey()
.windowedBy(sessionWindow)
.aggregate(
CustomerSession::new,
(customerId, event, session) -> {
session.setCustomerId(customerId);
session.addEvent(event);
return session;
},
(aggKey, aggOne, aggTwo) -> {
// Merge sessions if windows merge
CustomerSession merged = new CustomerSession();
merged.setCustomerId(aggOne.getCustomerId());
merged.setStartTime(Math.min(aggOne.getStartTime(), aggTwo.getStartTime()));
merged.setEndTime(Math.max(aggOne.getEndTime(), aggTwo.getEndTime()));
merged.setEventCount(aggOne.getEventCount() + aggTwo.getEventCount());
merged.setTotalAmount(aggOne.getTotalAmount() + aggTwo.getTotalAmount());
merged.getViewedProducts().addAll(aggOne.getViewedProducts());
merged.getViewedProducts().addAll(aggTwo.getViewedProducts());
merged.getPurchasedProducts().addAll(aggOne.getPurchasedProducts());
merged.getPurchasedProducts().addAll(aggTwo.getPurchasedProducts());
return merged;
},
Materialized.<String, CustomerSession, SessionStore<Bytes, byte[]>>as("customer-sessions-store")
.withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde<>(CustomerSession.class))
);
// Process completed sessions
customerSessions
.toStream()
.filter((windowedKey, session) ->
session != null && session.getEventCount() >= 3) // Only sessions with 3+ events
.map((windowedKey, session) -> {
// Calculate session duration in minutes
long durationMinutes = (session.getEndTime() - session.getStartTime()) / (60 * 1000);
session.setDurationMinutes(durationMinutes);
return KeyValue.pair(windowedKey.key(), session);
})
.to("customer-sessions-output",
Produced.with(Serdes.String(), new JsonSerde<>(CustomerSession.class)));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}
Example 4: Real-time Fraud Detection
public class FraudDetectionStreamsApp {
public static class Transaction {
private String transactionId;
private String customerId;
private String merchantId;
private Double amount;
private String location;
private Long timestamp;
private String paymentMethod;
// Constructors, getters, setters
}
public static class FraudScore {
private String transactionId;
private Double score;
private String reason;
private Boolean isFraudulent;
// Constructors, getters, setters
}
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("fraud-detection-app");
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
StreamsBuilder builder = new StreamsBuilder();
final JsonSerde<Transaction> transactionSerde = new JsonSerde<>(Transaction.class);
final JsonSerde<FraudScore> fraudScoreSerde = new JsonSerde<>(FraudScore.class);
KStream<String, Transaction> transactions = builder.stream(
"transactions-topic",
Consumed.with(Serdes.String(), transactionSerde)
);
// Create KTable for customer profiles (e.g., average transaction amount)
KTable<String, Double> customerAvgAmount = transactions
.groupBy((key, transaction) -> transaction.getCustomerId())
.aggregate(
() -> 0.0,
(customerId, transaction, currentAvg) -> {
// Simple moving average (for demo purposes)
return (currentAvg * 0.9) + (transaction.getAmount() * 0.1);
},
Materialized.as("customer-avg-amount-store")
);
// Join stream with table for real-time fraud detection
KStream<String, FraudScore> fraudScores = transactions
.leftJoin(customerAvgAmount,
(transaction, avgAmount) -> {
FraudScore score = new FraudScore();
score.setTransactionId(transaction.getTransactionId());
// Simple fraud detection rules
double fraudScore = 0.0;
List<String> reasons = new ArrayList<>();
// Rule 1: Amount significantly higher than average
if (avgAmount != null && transaction.getAmount() > avgAmount * 3) {
fraudScore += 0.6;
reasons.add("Amount significantly above average");
}
// Rule 2: Very large absolute amount
if (transaction.getAmount() > 10000.0) {
fraudScore += 0.3;
reasons.add("Very large transaction amount");
}
score.setScore(fraudScore);
score.setReason(String.join("; ", reasons));
score.setIsFraudulent(fraudScore > 0.7);
return score;
},
Joined.with(Serdes.String(), transactionSerde, Serdes.Double())
);
// Split stream into fraudulent and legitimate transactions
Map<String, KStream<String, FraudScore>> branches = fraudScores
.split(Named.as("fraud-"))
.branch((key, score) -> score.getIsFraudulent(), Branched.as("fraudulent"))
.branch((key, score) -> !score.getIsFraudulent(), Branched.as("legitimate"))
.noDefaultBranch();
// Send fraudulent transactions to alert topic
branches.get("fraud-fraudulent")
.to("fraud-alerts-topic", Produced.with(Serdes.String(), fraudScoreSerde));
// Send all transactions with scores to monitoring topic
fraudScores.to("transaction-scores-topic",
Produced.with(Serdes.String(), fraudScoreSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}
Advanced Stream Processing
Example 5: Stream-Table Join with Enrichment
public class StreamTableJoinEnrichment {
public static class Order {
private String orderId;
private String customerId;
private String productId;
private Integer quantity;
private Long timestamp;
// Constructors, getters, setters
}
public static class Customer {
private String customerId;
private String name;
private String tier; // "BRONZE", "SILVER", "GOLD"
private String region;
// Constructors, getters, setters
}
public static class EnrichedOrder {
private Order order;
private Customer customer;
private Double discount;
// Constructors, getters, setters
}
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("stream-table-join");
StreamsBuilder builder = new StreamsBuilder();
final JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
final JsonSerde<Customer> customerSerde = new JsonSerde<>(Customer.class);
final JsonSerde<EnrichedOrder> enrichedOrderSerde = new JsonSerde<>(EnrichedOrder.class);
// Stream of orders
KStream<String, Order> orders = builder.stream(
"orders-topic",
Consumed.with(Serdes.String(), orderSerde)
);
// Table of customers (from compacted topic)
KTable<String, Customer> customers = builder.table(
"customers-topic",
Consumed.with(Serdes.String(), customerSerde),
Materialized.as("customers-store")
);
// GlobalKTable for product information (small, read-only dataset)
GlobalKTable<String, String> products = builder.globalTable(
"products-topic",
Consumed.with(Serdes.String(), Serdes.String())
);
// Join order stream with customer table
KStream<String, EnrichedOrder> enrichedOrders = orders
.selectKey((key, order) -> order.getCustomerId()) // Re-key by customerId for join
.join(customers,
(order, customer) -> {
EnrichedOrder enriched = new EnrichedOrder();
enriched.setOrder(order);
enriched.setCustomer(customer);
// Apply tier-based discount
switch (customer.getTier()) {
case "GOLD":
enriched.setDiscount(0.15);
break;
case "SILVER":
enriched.setDiscount(0.10);
break;
case "BRONZE":
enriched.setDiscount(0.05);
break;
default:
enriched.setDiscount(0.0);
}
return enriched;
},
Joined.with(Serdes.String(), orderSerde, customerSerde)
)
.selectKey((key, enriched) -> enriched.getOrder().getOrderId()); // Back to orderId as key
// Further enrichment with GlobalKTable (no re-keying needed)
KStream<String, EnrichedOrder> fullyEnrichedOrders = enrichedOrders
.join(products,
(key, enriched) -> enriched.getOrder().getProductId(), // Key for GlobalKTable lookup
(enriched, productName) -> {
// Add product name to enriched order
// In real scenario, you might have a Product object
return enriched; // Simplified
}
);
// Process enriched orders
fullyEnrichedOrders
.peek((key, enriched) ->
System.out.println("Enriched order: " + key + ", Customer: " +
enriched.getCustomer().getName() + ", Discount: " + enriched.getDiscount()))
.to("enriched-orders-topic",
Produced.with(Serdes.String(), enrichedOrderSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}
Example 6: Windowed Aggregations with Suppression
public class WindowedAggregationsWithSuppression {
public static class SalesEvent {
private String productId;
private String category;
private Double amount;
private Integer quantity;
private Long timestamp;
// Constructors, getters, setters
}
public static class SalesSummary {
private Long windowStart;
private Long windowEnd;
private Integer totalQuantity;
private Double totalAmount;
private Long eventCount;
// Constructors, getters, setters
}
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("windowed-aggregations");
StreamsBuilder builder = new StreamsBuilder();
final JsonSerde<SalesEvent> salesEventSerde = new JsonSerde<>(SalesEvent.class);
final JsonSerde<SalesSummary> salesSummarySerde = new JsonSerde<>(SalesSummary.class);
KStream<String, SalesEvent> salesEvents = builder.stream(
"sales-events-topic",
Consumed.with(Serdes.String(), salesEventSerde)
);
// Tumbling window: 1 hour
TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1));
// Hopping window: 1 hour, advancing every 15 minutes
HoppingWindows hoppingWindow = HoppingWindows.ofTimeDifferenceWithNoGrace(
Duration.ofHours(1), Duration.ofMinutes(15));
// Sliding window: 1 hour
SlidingWindows slidingWindow = SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1));
// Tumbling window aggregation by product
KTable<Windowed<String>, SalesSummary> productSalesHourly = salesEvents
.groupBy((key, event) -> event.getProductId())
.windowedBy(tumblingWindow)
.aggregate(
SalesSummary::new,
(productId, event, summary) -> {
summary.setTotalQuantity(
(summary.getTotalQuantity() == null ? 0 : summary.getTotalQuantity()) +
event.getQuantity());
summary.setTotalAmount(
(summary.getTotalAmount() == null ? 0.0 : summary.getTotalAmount()) +
event.getAmount());
summary.setEventCount(
(summary.getEventCount() == null ? 0L : summary.getEventCount()) + 1);
return summary;
},
Materialized.<String, SalesSummary, WindowStore<Bytes, byte[]>>as("product-sales-hourly-store")
.withKeySerde(Serdes.String())
.withValueSerde(salesSummarySerde)
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
// Category-level aggregation with hopping window
KTable<Windowed<String>, SalesSummary> categorySales = salesEvents
.groupBy((key, event) -> event.getCategory())
.windowedBy(hoppingWindow)
.aggregate(
SalesSummary::new,
(category, event, summary) -> {
summary.setTotalQuantity(
(summary.getTotalQuantity() == null ? 0 : summary.getTotalQuantity()) +
event.getQuantity());
summary.setTotalAmount(
(summary.getTotalAmount() == null ? 0.0 : summary.getTotalAmount()) +
event.getAmount());
return summary;
}
);
// Output results
productSalesHourly
.toStream()
.map((windowedKey, summary) -> {
// Add window timing information
summary.setWindowStart(windowedKey.window().start());
summary.setWindowEnd(windowedKey.window().end());
return KeyValue.pair(windowedKey.key(), summary);
})
.to("product-sales-hourly-output",
Produced.with(Serdes.String(), salesSummarySerde));
categorySales
.toStream()
.map((windowedKey, summary) -> {
summary.setWindowStart(windowedKey.window().start());
summary.setWindowEnd(windowedKey.window().end());
return KeyValue.pair(windowedKey.key(), summary);
})
.to("category-sales-output",
Produced.with(Serdes.String(), salesSummarySerde));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}
Custom Processing with Processor API
Example 7: Custom Stateful Processor
public class CustomProcessorAPIExample {
public static class AlertProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
private KeyValueStore<String, Long> alertCountStore;
private final Long alertThreshold = 10L;
private final Duration windowSize = Duration.ofMinutes(5);
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
this.alertCountStore = context.getStateStore("alert-count-store");
// Schedule punctuation to clean up old entries
this.context.schedule(
windowSize,
PunctuationType.WALL_CLOCK_TIME,
this::cleanupOldEntries);
}
@Override
public void process(Record<String, String> record) {
String key = record.key();
Long currentTime = record.timestamp();
// Get current count or initialize
Long currentCount = alertCountStore.get(key);
if (currentCount == null) {
currentCount = 0L;
}
// Increment count
currentCount++;
alertCountStore.put(key, currentCount);
// Check threshold
if (currentCount >= alertThreshold) {
// Send alert
String alertMessage = String.format(
"Alert: Key '%s' has triggered %d times in the last %s",
key, currentCount, windowSize);
context.forward(new Record<>(key, alertMessage, currentTime));
// Reset count after alert
alertCountStore.put(key, 0L);
}
}
private void cleanupOldEntries(long timestamp) {
try (KeyValueIterator<String, Long> iterator = alertCountStore.all()) {
while (iterator.hasNext()) {
KeyValue<String, Long> entry = iterator.next();
// In a real scenario, you might track timestamps for each key
// and remove entries that haven't been updated in the window size
}
}
}
@Override
public void close() {
// Cleanup resources
}
}
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("custom-processor-example");
StreamsBuilder builder = new StreamsBuilder();
// Add state store
StoreBuilder<KeyValueStore<String, Long>> alertCountStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("alert-count-store"),
Serdes.String(),
Serdes.Long()
);
builder.addStateStore(alertCountStore);
// Build topology with custom processor
Topology topology = builder.build();
topology.addSource("Source", "input-topic")
.addProcessor("AlertProcessor",
() -> new AlertProcessor(), "Source")
.addStateStore(alertCountStore, "AlertProcessor")
.addSink("Sink", "alert-output-topic", "AlertProcessor");
KafkaStreams streams = new KafkaStreams(topology, config);
// Add uncaught exception handler
streams.setUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("Uncaught exception in thread " + thread + ": " + throwable);
// Return to stop the application, or continue to keep it running
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
});
streams.start();
}
}
Testing Kafka Streams Applications
Example 8: Unit Testing with TopologyTestDriver
public class KafkaStreamsTest {
@Test
public void testWordCountTopology() {
// Setup
Properties config = KafkaStreamsConfig.getStreamsConfig("test-word-count");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("input-topic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count();
wordCounts.toStream().to("output-topic");
Topology topology = builder.build();
try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
// Test inputs
TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic("input-topic",
Serdes.String().serializer(),
Serdes.String().serializer());
TestOutputTopic<String, Long> outputTopic =
testDriver.createOutputTopic("output-topic",
Serdes.String().deserializer(),
Serdes.Long().deserializer());
// Send test data
inputTopic.pipeInput("hello world hello kafka");
inputTopic.pipeInput("kafka streams are great");
// Verify results
List<KeyValue<String, Long>> results = outputTopic.readKeyValuesToList();
Assertions.assertEquals(5, results.size());
// Verify specific word counts
Map<String, Long> wordCountMap = new HashMap<>();
for (KeyValue<String, Long> result : results) {
wordCountMap.merge(result.key, result.value, Long::sum);
}
Assertions.assertEquals(2L, (long) wordCountMap.get("hello"));
Assertions.assertEquals(2L, (long) wordCountMap.get("kafka"));
Assertions.assertEquals(1L, (long) wordCountMap.get("streams"));
}
}
}
Monitoring and Operations
Example 9: Metrics and Monitoring
public class MonitoringStreamsApp {
public static void main(String[] args) {
Properties config = KafkaStreamsConfig.getStreamsConfig("monitoring-app");
// Enable metrics
config.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
StreamsBuilder builder = new StreamsBuilder();
// Your stream processing logic here
builder.stream("input-topic")
.process(() -> new MonitoringProcessor())
.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), config);
// Add state listener
streams.setStateListener((newState, oldState) -> {
System.out.printf("State transition: %s -> %s%n", oldState, newState);
});
// Add metrics listener
streams.setMetricsListener(new MetricsListener() {
@Override
public void metrics(Map<String, Object> metrics) {
// Monitor specific metrics
Double processRate = (Double) metrics.get("process-rate");
Double pollRate = (Double) metrics.get("poll-rate");
if (processRate != null && processRate < 10.0) {
System.out.println("Warning: Low processing rate: " + processRate);
}
}
});
// Add exception handler
streams.setUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("Uncaught exception in thread " + thread);
throwable.printStackTrace();
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
streams.start();
}
public static class MonitoringProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
private long processedCount = 0;
private long lastLogTime = System.currentTimeMillis();
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
}
@Override
public void process(Record<String, String> record) {
processedCount++;
// Log progress every 1000 records or 30 seconds
long currentTime = System.currentTimeMillis();
if (processedCount % 1000 == 0 || currentTime - lastLogTime > 30000) {
System.out.printf("Processed %d records. Last key: %s%n",
processedCount, record.key());
lastLogTime = currentTime;
}
// Forward record
context.forward(record);
}
@Override
public void close() {
System.out.println("Processor closed. Total processed: " + processedCount);
}
}
}
Best Practices
- Use exactly-once processing for critical applications
- Choose appropriate window types based on use case requirements
- Monitor state store sizes and implement cleanup strategies
- Use suppression to control output rate for windowed operations
- Implement proper error handling and dead letter queues
- Test thoroughly with TopologyTestDriver
- Monitor metrics for performance and troubleshooting
- Use appropriate serdes for your data types
- Consider partitioning strategy for optimal performance
- Plan for state store cleanup and maintenance
Kafka Streams provides a powerful yet simple way to build real-time data processing applications that can scale with your data needs while providing strong guarantees about processing semantics.