Building Robust, Scalable Stateful Applications
Article
Apache Flink's stateful computations enable building sophisticated stream processing applications that maintain context across events. Unlike stateless operations, stateful computations remember information across events, allowing for complex patterns like sessionization, windowed aggregations, and complex event processing.
Flink State Architecture Overview
Key Concepts:
- Keyed State: State partitioned by key, accessible only in keyed contexts
- Operator State: State tied to operator instances, non-partitioned
- State Backends: Storage for state (Memory, FS, RocksDB)
- Checkpoints: Consistent snapshots of distributed state
- Savepoints: Manual checkpoints for stopping/resuming applications
State Types:
- ValueState: Single value per key
- ListState: List of values per key
- MapState: Key-value map per key
- ReducingState: Single value that gets reduced
- AggregatingState: More general aggregation
1. Project Setup and Dependencies
Maven Configuration (pom.xml):
<properties>
<flink.version>1.17.1</flink.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<!-- Flink Core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink CLI for running applications -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink State Processor API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink File System (for checkpoints) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-filesystems</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
2. Core Data Models and POJOs
package com.flink.stateful.models;
import java.time.Instant;
import java.util.Objects;
// Basic event model
public class TransactionEvent {
private String transactionId;
private String customerId;
private String merchantId;
private double amount;
private String currency;
private Instant timestamp;
private TransactionType transactionType;
public TransactionEvent() {}
public TransactionEvent(String transactionId, String customerId, String merchantId,
double amount, String currency, Instant timestamp,
TransactionType transactionType) {
this.transactionId = transactionId;
this.customerId = customerId;
this.merchantId = merchantId;
this.amount = amount;
this.currency = currency;
this.timestamp = timestamp;
this.transactionType = transactionType;
}
// Getters and setters
public String getTransactionId() { return transactionId; }
public void setTransactionId(String transactionId) { this.transactionId = transactionId; }
public String getCustomerId() { return customerId; }
public void setCustomerId(String customerId) { this.customerId = customerId; }
public String getMerchantId() { return merchantId; }
public void setMerchantId(String merchantId) { this.merchantId = merchantId; }
public double getAmount() { return amount; }
public void setAmount(double amount) { this.amount = amount; }
public String getCurrency() { return currency; }
public void setCurrency(String currency) { this.currency = currency; }
public Instant getTimestamp() { return timestamp; }
public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }
public TransactionType getTransactionType() { return transactionType; }
public void setTransactionType(TransactionType transactionType) { this.transactionType = transactionType; }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TransactionEvent that = (TransactionEvent) o;
return Double.compare(that.amount, amount) == 0 &&
Objects.equals(transactionId, that.transactionId) &&
Objects.equals(customerId, that.customerId) &&
Objects.equals(merchantId, that.merchantId) &&
Objects.equals(currency, that.currency) &&
Objects.equals(timestamp, that.timestamp) &&
transactionType == that.transactionType;
}
@Override
public int hashCode() {
return Objects.hash(transactionId, customerId, merchantId, amount, currency, timestamp, transactionType);
}
@Override
public String toString() {
return "TransactionEvent{" +
"transactionId='" + transactionId + '\'' +
", customerId='" + customerId + '\'' +
", merchantId='" + merchantId + '\'' +
", amount=" + amount +
", currency='" + currency + '\'' +
", timestamp=" + timestamp +
", transactionType=" + transactionType +
'}';
}
}
// Transaction types
enum TransactionType {
PURCHASE,
REFUND,
CHARGEBACK,
TRANSFER,
FEE
}
// Customer profile for state
class CustomerProfile {
private String customerId;
private double totalSpent;
private int transactionCount;
private double averageTransaction;
private Instant firstSeen;
private Instant lastSeen;
private String favoriteMerchant;
private Map<String, Integer> merchantFrequency;
public CustomerProfile() {
this.merchantFrequency = new HashMap<>();
}
public CustomerProfile(String customerId) {
this();
this.customerId = customerId;
this.firstSeen = Instant.now();
this.lastSeen = Instant.now();
}
public void updateWithTransaction(TransactionEvent transaction) {
this.transactionCount++;
this.totalSpent += transaction.getAmount();
this.averageTransaction = totalSpent / transactionCount;
this.lastSeen = transaction.getTimestamp();
// Update merchant frequency
merchantFrequency.merge(transaction.getMerchantId(), 1, Integer::sum);
// Update favorite merchant
this.favoriteMerchant = merchantFrequency.entrySet().stream()
.max(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.orElse(null);
}
// Getters and setters
public String getCustomerId() { return customerId; }
public void setCustomerId(String customerId) { this.customerId = customerId; }
public double getTotalSpent() { return totalSpent; }
public void setTotalSpent(double totalSpent) { this.totalSpent = totalSpent; }
public int getTransactionCount() { return transactionCount; }
public void setTransactionCount(int transactionCount) { this.transactionCount = transactionCount; }
public double getAverageTransaction() { return averageTransaction; }
public void setAverageTransaction(double averageTransaction) { this.averageTransaction = averageTransaction; }
public Instant getFirstSeen() { return firstSeen; }
public void setFirstSeen(Instant firstSeen) { this.firstSeen = firstSeen; }
public Instant getLastSeen() { return lastSeen; }
public void setLastSeen(Instant lastSeen) { this.lastSeen = lastSeen; }
public String getFavoriteMerchant() { return favoriteMerchant; }
public void setFavoriteMerchant(String favoriteMerchant) { this.favoriteMerchant = favoriteMerchant; }
public Map<String, Integer> getMerchantFrequency() { return merchantFrequency; }
public void setMerchantFrequency(Map<String, Integer> merchantFrequency) { this.merchantFrequency = merchantFrequency; }
}
// Alert types
class FraudAlert {
private String alertId;
private String customerId;
private String reason;
private double suspiciousAmount;
private Instant detectedAt;
private AlertSeverity severity;
public FraudAlert() {}
public FraudAlert(String customerId, String reason, double suspiciousAmount, AlertSeverity severity) {
this.alertId = UUID.randomUUID().toString();
this.customerId = customerId;
this.reason = reason;
this.suspiciousAmount = suspiciousAmount;
this.detectedAt = Instant.now();
this.severity = severity;
}
// Getters and setters
public String getAlertId() { return alertId; }
public void setAlertId(String alertId) { this.alertId = alertId; }
public String getCustomerId() { return customerId; }
public void setCustomerId(String customerId) { this.customerId = customerId; }
public String getReason() { return reason; }
public void setReason(String reason) { this.reason = reason; }
public double getSuspiciousAmount() { return suspiciousAmount; }
public void setSuspiciousAmount(double suspiciousAmount) { this.suspiciousAmount = suspiciousAmount; }
public Instant getDetectedAt() { return detectedAt; }
public void setDetectedAt(Instant detectedAt) { this.detectedAt = detectedAt; }
public AlertSeverity getSeverity() { return severity; }
public void setSeverity(AlertSeverity severity) { this.severity = severity; }
@Override
public String toString() {
return "FraudAlert{" +
"alertId='" + alertId + '\'' +
", customerId='" + customerId + '\'' +
", reason='" + reason + '\'' +
", suspiciousAmount=" + suspiciousAmount +
", detectedAt=" + detectedAt +
", severity=" + severity +
'}';
}
}
enum AlertSeverity {
LOW,
MEDIUM,
HIGH,
CRITICAL
}
3. Basic Stateful Functions
package com.flink.stateful.functions;
import com.flink.stateful.models.*;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.time.Instant;
/**
* Maintains customer profile state and detects behavioral changes
*/
public class CustomerProfileFunction extends RichFlatMapFunction<TransactionEvent, CustomerProfile> {
// ValueState to store customer profile
private transient ValueState<CustomerProfile> customerProfileState;
// State TTL configuration
private final StateTtlConfig ttlConfig;
public CustomerProfileFunction() {
// Configure state TTL - automatically cleanup state after 30 days of inactivity
this.ttlConfig = StateTtlConfig.newBuilder(Time.days(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInBackground()
.build();
}
@Override
public void open(Configuration parameters) throws Exception {
// Initialize state descriptor with TTL
ValueStateDescriptor<CustomerProfile> descriptor =
new ValueStateDescriptor<>("customerProfile", CustomerProfile.class);
descriptor.enableTimeToLive(ttlConfig);
customerProfileState = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(TransactionEvent transaction, Collector<CustomerProfile> out) throws Exception {
String customerId = transaction.getCustomerId();
// Get current profile or create new one
CustomerProfile profile = customerProfileState.value();
if (profile == null) {
profile = new CustomerProfile(customerId);
}
// Update profile with new transaction
profile.updateWithTransaction(transaction);
// Update state
customerProfileState.update(profile);
// Emit updated profile
out.collect(profile);
}
}
/**
* Detects high-frequency transactions (potential fraud)
*/
public class HighFrequencyDetectionFunction extends RichFlatMapFunction<TransactionEvent, FraudAlert> {
private transient ListState<TransactionEvent> recentTransactionsState;
private transient ValueState<Integer> transactionCountState;
private final int frequencyThreshold;
private final Duration timeWindow;
public HighFrequencyDetectionFunction(int frequencyThreshold, Duration timeWindow) {
this.frequencyThreshold = frequencyThreshold;
this.timeWindow = timeWindow;
}
@Override
public void open(Configuration parameters) throws Exception {
// ListState to store recent transactions
ListStateDescriptor<TransactionEvent> transactionsDescriptor =
new ListStateDescriptor<>("recentTransactions", TransactionEvent.class);
// Configure TTL for list state
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.milliseconds(timeWindow.toMillis()))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build();
transactionsDescriptor.enableTimeToLive(ttlConfig);
recentTransactionsState = getRuntimeContext().getListState(transactionsDescriptor);
// ValueState for transaction count
ValueStateDescriptor<Integer> countDescriptor =
new ValueStateDescriptor<>("transactionCount", Integer.class);
transactionCountState = getRuntimeContext().getState(countDescriptor);
}
@Override
public void flatMap(TransactionEvent transaction, Collector<FraudAlert> out) throws Exception {
String customerId = transaction.getCustomerId();
Instant now = Instant.now();
// Clean old transactions
cleanupOldTransactions(now);
// Add current transaction
recentTransactionsState.add(transaction);
// Update count
Integer currentCount = transactionCountState.value();
if (currentCount == null) {
currentCount = 0;
}
currentCount++;
transactionCountState.update(currentCount);
// Check threshold
if (currentCount >= frequencyThreshold) {
FraudAlert alert = new FraudAlert(
customerId,
"High transaction frequency: " + currentCount + " transactions in " + timeWindow,
transaction.getAmount(),
currentCount > frequencyThreshold * 2 ? AlertSeverity.HIGH : AlertSeverity.MEDIUM
);
out.collect(alert);
}
}
private void cleanupOldTransactions(Instant currentTime) throws Exception {
List<TransactionEvent> currentTransactions = new ArrayList<>();
Instant cutoff = currentTime.minus(timeWindow);
for (TransactionEvent transaction : recentTransactionsState.get()) {
if (transaction.getTimestamp().isAfter(cutoff)) {
currentTransactions.add(transaction);
}
}
// Update state with cleaned list
recentTransactionsState.update(currentTransactions);
// Update count
transactionCountState.update(currentTransactions.size());
}
}
/**
* Detects unusual spending patterns
*/
public class SpendingPatternFunction extends RichFlatMapFunction<TransactionEvent, FraudAlert> {
private transient ValueState<Double> averageSpendState;
private transient ValueState<Double> maxSpendState;
private transient ListState<Double> recentSpendsState;
private final double deviationThreshold; // e.g., 3.0 for 3 standard deviations
private final int sampleSize;
public SpendingPatternFunction(double deviationThreshold, int sampleSize) {
this.deviationThreshold = deviationThreshold;
this.sampleSize = sampleSize;
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Double> avgDescriptor =
new ValueStateDescriptor<>("averageSpend", Double.class);
ValueStateDescriptor<Double> maxDescriptor =
new ValueStateDescriptor<>("maxSpend", Double.class);
ListStateDescriptor<Double> recentDescriptor =
new ListStateDescriptor<>("recentSpends", Double.class);
averageSpendState = getRuntimeContext().getState(avgDescriptor);
maxSpendState = getRuntimeContext().getState(maxDescriptor);
recentSpendsState = getRuntimeContext().getListState(recentDescriptor);
}
@Override
public void flatMap(TransactionEvent transaction, Collector<FraudAlert> out) throws Exception {
double currentAmount = transaction.getAmount();
// Get historical data
Double historicalAverage = averageSpendState.value();
Double historicalMax = maxSpendState.value();
List<Double> recentSpends = new ArrayList<>();
for (Double spend : recentSpendsState.get()) {
recentSpends.add(spend);
}
// Check for anomalies
if (historicalAverage != null && historicalMax != null) {
boolean isUnusual = detectUnusualSpending(currentAmount, historicalAverage, historicalMax, recentSpends);
if (isUnusual) {
FraudAlert alert = new FraudAlert(
transaction.getCustomerId(),
String.format("Unusual spending pattern: $%.2f (avg: $%.2f, max: $%.2f)",
currentAmount, historicalAverage, historicalMax),
currentAmount,
currentAmount > historicalMax * 2 ? AlertSeverity.HIGH : AlertSeverity.MEDIUM
);
out.collect(alert);
}
}
// Update state
updateSpendingPatterns(currentAmount, recentSpends);
}
private boolean detectUnusualSpending(double currentAmount, double average, double max, List<Double> recentSpends) {
// Rule 1: Significantly above historical maximum
if (currentAmount > max * 1.5) {
return true;
}
// Rule 2: Statistical outlier (z-score)
if (recentSpends.size() >= 10) {
double mean = recentSpends.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);
double stdDev = calculateStandardDeviation(recentSpends, mean);
if (stdDev > 0) {
double zScore = Math.abs((currentAmount - mean) / stdDev);
if (zScore > deviationThreshold) {
return true;
}
}
}
return false;
}
private void updateSpendingPatterns(double currentAmount, List<Double> recentSpends) throws Exception {
// Add current amount to recent spends
recentSpends.add(currentAmount);
// Maintain sample size
while (recentSpends.size() > sampleSize) {
recentSpends.remove(0);
}
// Update statistics
double newAverage = recentSpends.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);
double newMax = recentSpends.stream().mapToDouble(Double::doubleValue).max().orElse(0.0);
averageSpendState.update(newAverage);
maxSpendState.update(newMax);
recentSpendsState.update(recentSpends);
}
private double calculateStandardDeviation(List<Double> values, double mean) {
double variance = values.stream()
.mapToDouble(v -> Math.pow(v - mean, 2))
.average().orElse(0.0);
return Math.sqrt(variance);
}
}
4. Advanced Stateful Processing with Windows
package com.flink.stateful.functions;
import com.flink.stateful.models.*;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichProcessFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
/**
* Session window processing for customer activity sessions
*/
public class SessionWindowProcessing {
public static DataStream<CustomerSession> processSessions(DataStream<TransactionEvent> transactions) {
return transactions
.assignTimestampsAndWatermarks(
WatermarkStrategy.<TransactionEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) ->
event.getTimestamp().toEpochMilli())
)
.keyBy(TransactionEvent::getCustomerId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.aggregate(new SessionAggregator(), new SessionProcessFunction());
}
// Aggregator for session data
public static class SessionAggregator implements AggregateFunction<
TransactionEvent, SessionAccumulator, SessionAccumulator> {
@Override
public SessionAccumulator createAccumulator() {
return new SessionAccumulator();
}
@Override
public SessionAccumulator add(TransactionEvent transaction, SessionAccumulator accumulator) {
return accumulator.addTransaction(transaction);
}
@Override
public SessionAccumulator getResult(SessionAccumulator accumulator) {
return accumulator;
}
@Override
public SessionAccumulator merge(SessionAccumulator a, SessionAccumulator b) {
return a.merge(b);
}
}
// Process function for session windows
public static class SessionProcessFunction extends ProcessWindowFunction<
SessionAccumulator, CustomerSession, String, TimeWindow> {
@Override
public void process(String customerId, Context context,
Iterable<SessionAccumulator> elements, Collector<CustomerSession> out) {
SessionAccumulator accumulator = elements.iterator().next();
CustomerSession session = accumulator.toCustomerSession(customerId, context.window());
out.collect(session);
}
}
// Accumulator for session data
public static class SessionAccumulator {
private Instant sessionStart;
private Instant sessionEnd;
private double totalAmount;
private int transactionCount;
private Set<String> merchants;
private TransactionEvent firstTransaction;
private TransactionEvent lastTransaction;
public SessionAccumulator() {
this.merchants = new HashSet<>();
}
public SessionAccumulator addTransaction(TransactionEvent transaction) {
if (sessionStart == null || transaction.getTimestamp().isBefore(sessionStart)) {
sessionStart = transaction.getTimestamp();
firstTransaction = transaction;
}
if (sessionEnd == null || transaction.getTimestamp().isAfter(sessionEnd)) {
sessionEnd = transaction.getTimestamp();
lastTransaction = transaction;
}
totalAmount += transaction.getAmount();
transactionCount++;
merchants.add(transaction.getMerchantId());
return this;
}
public SessionAccumulator merge(SessionAccumulator other) {
if (other.sessionStart != null &&
(sessionStart == null || other.sessionStart.isBefore(sessionStart))) {
sessionStart = other.sessionStart;
firstTransaction = other.firstTransaction;
}
if (other.sessionEnd != null &&
(sessionEnd == null || other.sessionEnd.isAfter(sessionEnd))) {
sessionEnd = other.sessionEnd;
lastTransaction = other.lastTransaction;
}
totalAmount += other.totalAmount;
transactionCount += other.transactionCount;
merchants.addAll(other.merchants);
return this;
}
public CustomerSession toCustomerSession(String customerId, TimeWindow window) {
return new CustomerSession(
UUID.randomUUID().toString(),
customerId,
sessionStart,
sessionEnd,
totalAmount,
transactionCount,
new ArrayList<>(merchants),
firstTransaction,
lastTransaction,
window.getStart(),
window.getEnd()
);
}
}
}
/**
* Customer session model
*/
class CustomerSession {
private String sessionId;
private String customerId;
private Instant startTime;
private Instant endTime;
private double totalAmount;
private int transactionCount;
private List<String> merchants;
private TransactionEvent firstTransaction;
private TransactionEvent lastTransaction;
private long windowStart;
private long windowEnd;
public CustomerSession(String sessionId, String customerId, Instant startTime, Instant endTime,
double totalAmount, int transactionCount, List<String> merchants,
TransactionEvent firstTransaction, TransactionEvent lastTransaction,
long windowStart, long windowEnd) {
this.sessionId = sessionId;
this.customerId = customerId;
this.startTime = startTime;
this.endTime = endTime;
this.totalAmount = totalAmount;
this.transactionCount = transactionCount;
this.merchants = merchants;
this.firstTransaction = firstTransaction;
this.lastTransaction = lastTransaction;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
}
// Getters and setters
public String getSessionId() { return sessionId; }
public void setSessionId(String sessionId) { this.sessionId = sessionId; }
public String getCustomerId() { return customerId; }
public void setCustomerId(String customerId) { this.customerId = customerId; }
public Instant getStartTime() { return startTime; }
public void setStartTime(Instant startTime) { this.startTime = startTime; }
public Instant getEndTime() { return endTime; }
public void setEndTime(Instant endTime) { this.endTime = endTime; }
public double getTotalAmount() { return totalAmount; }
public void setTotalAmount(double totalAmount) { this.totalAmount = totalAmount; }
public int getTransactionCount() { return transactionCount; }
public void setTransactionCount(int transactionCount) { this.transactionCount = transactionCount; }
public List<String> getMerchants() { return merchants; }
public void setMerchants(List<String> merchants) { this.merchants = merchants; }
public TransactionEvent getFirstTransaction() { return firstTransaction; }
public void setFirstTransaction(TransactionEvent firstTransaction) { this.firstTransaction = firstTransaction; }
public TransactionEvent getLastTransaction() { return lastTransaction; }
public void setLastTransaction(TransactionEvent lastTransaction) { this.lastTransaction = lastTransaction; }
public long getWindowStart() { return windowStart; }
public void setWindowStart(long windowStart) { this.windowStart = windowStart; }
public long getWindowEnd() { return windowEnd; }
public void setWindowEnd(long windowEnd) { this.windowEnd = windowEnd; }
@Override
public String toString() {
return "CustomerSession{" +
"sessionId='" + sessionId + '\'' +
", customerId='" + customerId + '\'' +
", startTime=" + startTime +
", endTime=" + endTime +
", totalAmount=" + totalAmount +
", transactionCount=" + transactionCount +
", merchants=" + merchants +
", windowStart=" + windowStart +
", windowEnd=" + windowEnd +
'}';
}
}
/**
* Complex pattern detection using ProcessFunction
*/
public class ComplexPatternDetectionFunction extends KeyedProcessFunction<String, TransactionEvent, FraudAlert> {
private transient MapState<String, TransactionEvent> pendingRefundsState;
private transient ValueState<Instant> lastPurchaseTimeState;
private transient ListState<TransactionEvent> recentPurchasesState;
private final Duration refundTimeWindow;
private final int rapidPurchaseThreshold;
public ComplexPatternDetectionFunction(Duration refundTimeWindow, int rapidPurchaseThreshold) {
this.refundTimeWindow = refundTimeWindow;
this.rapidPurchaseThreshold = rapidPurchaseThreshold;
}
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, TransactionEvent> refundsDescriptor =
new MapStateDescriptor<>("pendingRefunds", String.class, TransactionEvent.class);
pendingRefundsState = getRuntimeContext().getMapState(refundsDescriptor);
ValueStateDescriptor<Instant> lastPurchaseDescriptor =
new ValueStateDescriptor<>("lastPurchaseTime", Instant.class);
lastPurchaseTimeState = getRuntimeContext().getState(lastPurchaseDescriptor);
ListStateDescriptor<TransactionEvent> recentPurchasesDescriptor =
new ListStateDescriptor<>("recentPurchases", TransactionEvent.class);
recentPurchasesState = getRuntimeContext().getListState(recentPurchasesDescriptor);
}
@Override
public void processElement(TransactionEvent transaction, Context ctx,
Collector<FraudAlert> out) throws Exception {
switch (transaction.getTransactionType()) {
case PURCHASE:
handlePurchase(transaction, ctx, out);
break;
case REFUND:
handleRefund(transaction, ctx, out);
break;
case CHARGEBACK:
handleChargeback(transaction, ctx, out);
break;
}
}
private void handlePurchase(TransactionEvent purchase, Context ctx, Collector<FraudAlert> out) throws Exception {
String customerId = purchase.getCustomerId();
// Check for rapid purchases
checkRapidPurchases(purchase, out);
// Store purchase for potential refund pattern detection
pendingRefundsState.put(purchase.getTransactionId(), purchase);
// Set timer for refund window expiration
long refundTimer = purchase.getTimestamp().plus(refundTimeWindow).toEpochMilli();
ctx.timerService().registerEventTimeTimer(refundTimer);
// Update last purchase time
lastPurchaseTimeState.update(purchase.getTimestamp());
// Add to recent purchases
recentPurchasesState.add(purchase);
}
private void handleRefund(TransactionEvent refund, Context ctx, Collector<FraudAlert> out) throws Exception {
// Check if refund matches a recent purchase
TransactionEvent originalPurchase = pendingRefundsState.get(refund.getTransactionId());
if (originalPurchase != null) {
// Valid refund - remove from pending
pendingRefundsState.remove(refund.getTransactionId());
// Check for suspicious refund patterns
if (refund.getAmount() == originalPurchase.getAmount()) {
// Full refund shortly after purchase might be suspicious
Duration timeBetween = Duration.between(
originalPurchase.getTimestamp(), refund.getTimestamp());
if (timeBetween.toMinutes() < 10) {
FraudAlert alert = new FraudAlert(
refund.getCustomerId(),
"Rapid full refund: purchase and refund within " + timeBetween.toMinutes() + " minutes",
refund.getAmount(),
AlertSeverity.MEDIUM
);
out.collect(alert);
}
}
} else {
// Refund without matching purchase - highly suspicious
FraudAlert alert = new FraudAlert(
refund.getCustomerId(),
"Refund without matching purchase: " + refund.getTransactionId(),
refund.getAmount(),
AlertSeverity.HIGH
);
out.collect(alert);
}
}
private void handleChargeback(TransactionEvent chargeback, Context ctx, Collector<FraudAlert> out) throws Exception {
// Chargebacks are always suspicious
FraudAlert alert = new FraudAlert(
chargeback.getCustomerId(),
"Chargeback filed: " + chargeback.getTransactionId(),
chargeback.getAmount(),
AlertSeverity.CRITICAL
);
out.collect(alert);
}
private void checkRapidPurchases(TransactionEvent purchase, Collector<FraudAlert> out) throws Exception {
Instant lastPurchaseTime = lastPurchaseTimeState.value();
if (lastPurchaseTime != null) {
Duration timeSinceLastPurchase = Duration.between(lastPurchaseTime, purchase.getTimestamp());
if (timeSinceLastPurchase.toSeconds() < 30) {
// Rapid purchase detected
FraudAlert alert = new FraudAlert(
purchase.getCustomerId(),
"Rapid purchase: " + timeSinceLastPurchase.toSeconds() + " seconds since last purchase",
purchase.getAmount(),
AlertSeverity.LOW
);
out.collect(alert);
}
}
// Check count of recent purchases
List<TransactionEvent> recentPurchases = new ArrayList<>();
for (TransactionEvent recent : recentPurchasesState.get()) {
recentPurchases.add(recent);
}
// Clean old purchases (older than 1 hour)
Instant oneHourAgo = purchase.getTimestamp().minus(Duration.ofHours(1));
recentPurchases.removeIf(tx -> tx.getTimestamp().isBefore(oneHourAgo));
if (recentPurchases.size() >= rapidPurchaseThreshold) {
FraudAlert alert = new FraudAlert(
purchase.getCustomerId(),
"High purchase frequency: " + recentPurchases.size() + " purchases in last hour",
purchase.getAmount(),
AlertSeverity.MEDIUM
);
out.collect(alert);
}
// Update state
recentPurchasesState.update(recentPurchases);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<FraudAlert> out) throws Exception {
// Clean up expired purchases from pending refunds
Instant cutoff = Instant.ofEpochMilli(timestamp).minus(refundTimeWindow);
Iterator<Map.Entry<String, TransactionEvent>> iterator = pendingRefundsState.iterator();
while (iterator.hasNext()) {
Map.Entry<String, TransactionEvent> entry = iterator.next();
if (entry.getValue().getTimestamp().isBefore(cutoff)) {
iterator.remove();
}
}
}
}
5. State Backend Configuration and Checkpointing
package com.flink.stateful.config;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
public class FlinkStateConfig {
public static void configureStateBackend(StreamExecutionEnvironment env, String checkpointDir) {
// Configure checkpointing
env.enableCheckpointing(60000); // Checkpoint every 60 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(600000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// Configure state backend
StateBackend stateBackend = createRocksDBStateBackend(checkpointDir);
env.setStateBackend(stateBackend);
// Configure restart strategy
env.setRestartStrategy(org.apache.flink.api.common.restart.RestartStrategies
.fixedDelayRestart(3, org.apache.flink.api.common.time.Time.seconds(10)));
}
private static StateBackend createRocksDBStateBackend(String checkpointDir) {
try {
EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
// Configure RocksDB options
org.rocksdb.Options rocksOptions = new org.rocksdb.Options();
rocksOptions.setMaxOpenFiles(-1); // Unlimited open files
rocksOptions.setMaxBackgroundJobs(4);
rocksOptions.setCompressionType(org.rocksdb.CompressionType.LZ4_COMPRESSION);
rocksDBStateBackend.setRocksDBOptions(rocksOptions);
return rocksDBStateBackend;
} catch (Exception e) {
// Fallback to filesystem state backend
return new FsStateBackend(checkpointDir, true);
}
}
public static void configureRocksDBMemory(StreamExecutionEnvironment env) {
// Configure RocksDB memory management
env.getConfig().setUseSnapshotCompression(true);
// Set managed memory fraction for RocksDB
env.getConfig().setTaskManagerMemoryOptions(
new org.apache.flink.configuration.TaskManagerOptions()
.setManagedMemoryFraction(0.3) // 30% for RocksDB
);
}
}
// State serialization configuration
class StateSerializationConfig {
public static void configureSerialization(StreamExecutionEnvironment env) {
// Register custom serializers
env.getConfig().registerTypeWithKryoSerializer(
TransactionEvent.class,
com.esotericsoftware.kryo.Serializer.class
);
env.getConfig().registerTypeWithKryoSerializer(
CustomerProfile.class,
com.esotericsoftware.kryo.Serializer.class
);
// Enable POJO types for better performance
env.getConfig().enableForceAvro();
env.getConfig().enableForceKryo();
}
}
6. Complete Flink Application
package com.flink.stateful.app;
import com.flink.stateful.config.FlinkStateConfig;
import com.flink.stateful.functions.*;
import com.flink.stateful.models.*;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.time.Instant;
/**
* Complete Fraud Detection Application using Flink Stateful Computations
*/
public class FraudDetectionApplication {
// Side output for different alert severities
private static final OutputTag<FraudAlert> LOW_SEVERITY_ALERTS =
new OutputTag<FraudAlert>("low-severity-alerts") {};
private static final OutputTag<FraudAlert> MEDIUM_SEVERITY_ALERTS =
new OutputTag<FraudAlert>("medium-severity-alerts") {};
private static final OutputTag<FraudAlert> HIGH_SEVERITY_ALERTS =
new OutputTag<FraudAlert>("high-severity-alerts") {};
public static void main(String[] args) throws Exception {
// Set up execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure state backend and checkpointing
FlinkStateConfig.configureStateBackend(env, "file:///tmp/flink-checkpoints");
FlinkStateConfig.configureRocksDBMemory(env);
// Create transaction source (Kafka in production, mock for demo)
DataStream<TransactionEvent> transactions = createTransactionSource(env);
// Build processing pipeline
buildFraudDetectionPipeline(transactions);
// Execute application
env.execute("Fraud Detection Application");
}
private static DataStream<TransactionEvent> createTransactionSource(StreamExecutionEnvironment env) {
// In production, use Kafka source
/*
KafkaSource<TransactionEvent> source = KafkaSource.<TransactionEvent>builder()
.setBootstrapServers("localhost:9092")
.setTopics("transactions")
.setGroupId("fraud-detection")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new TransactionDeserializer())
.build();
return env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
*/
// For demo, create mock data
return env.addSource(new TransactionEventGenerator())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<TransactionEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp().toEpochMilli())
)
.name("Transaction Source");
}
private static void buildFraudDetectionPipeline(DataStream<TransactionEvent> transactions) {
// Key by customer for stateful operations
DataStream<TransactionEvent> keyedTransactions = transactions
.keyBy(TransactionEvent::getCustomerId);
// 1. Customer Profile Building
SingleOutputStreamOperator<CustomerProfile> customerProfiles = keyedTransactions
.flatMap(new CustomerProfileFunction())
.name("Customer Profile Builder");
// 2. High Frequency Detection
SingleOutputStreamOperator<FraudAlert> frequencyAlerts = keyedTransactions
.flatMap(new HighFrequencyDetectionFunction(10, Duration.ofMinutes(5)))
.name("High Frequency Detector");
// 3. Spending Pattern Analysis
SingleOutputStreamOperator<FraudAlert> spendingAlerts = keyedTransactions
.flatMap(new SpendingPatternFunction(3.0, 50))
.name("Spending Pattern Analyzer");
// 4. Complex Pattern Detection
SingleOutputStreamOperator<FraudAlert> complexAlerts = keyedTransactions
.keyBy(TransactionEvent::getCustomerId)
.process(new ComplexPatternDetectionFunction(Duration.ofHours(24), 20))
.name("Complex Pattern Detector");
// Merge all alerts
DataStream<FraudAlert> allAlerts = frequencyAlerts
.union(spendingAlerts, complexAlerts);
// Route alerts by severity
SingleOutputStreamOperator<FraudAlert> routedAlerts = allAlerts
.process(new AlertRouterProcessFunction());
// Get side outputs
DataStream<FraudAlert> lowAlerts = routedAlerts.getSideOutput(LOW_SEVERITY_ALERTS);
DataStream<FraudAlert> mediumAlerts = routedAlerts.getSideOutput(MEDIUM_SEVERITY_ALERTS);
DataStream<FraudAlert> highAlerts = routedAlerts.getSideOutput(HIGH_SEVERITY_ALERTS);
// Sinks for different alert severities
lowAlerts.addSink(new PrintSinkFunction<>("LOW ALERT: "))
.name("Low Alert Sink");
mediumAlerts.addSink(new PrintSinkFunction<>("MEDIUM ALERT: "))
.name("Medium Alert Sink");
highAlerts.addSink(new PrintSinkFunction<>("HIGH ALERT: "))
.name("High Alert Sink");
// Additional: Session processing
DataStream<CustomerSession> customerSessions = SessionWindowProcessing.processSessions(transactions);
customerSessions.addSink(new PrintSinkFunction<>("SESSION: "))
.name("Session Sink");
// Sink customer profiles to external system (e.g., database)
customerProfiles.addSink(new CustomerProfileSink())
.name("Customer Profile Sink");
}
/**
* Routes alerts to different side outputs based on severity
*/
public static class AlertRouterProcessFunction extends org.apache.flink.streaming.api.functions.ProcessFunction<FraudAlert, FraudAlert> {
@Override
public void processElement(FraudAlert alert, Context ctx, Collector<FraudAlert> out) throws Exception {
switch (alert.getSeverity()) {
case LOW:
ctx.output(LOW_SEVERITY_ALERTS, alert);
break;
case MEDIUM:
ctx.output(MEDIUM_SEVERITY_ALERTS, alert);
break;
case HIGH:
case CRITICAL:
ctx.output(HIGH_SEVERITY_ALERTS, alert);
break;
}
// Also emit to main output for logging
out.collect(alert);
}
}
/**
* Mock transaction generator for demo purposes
*/
public static class TransactionEventGenerator implements org.apache.flink.streaming.api.functions.source.SourceFunction<TransactionEvent> {
private volatile boolean running = true;
private final Random random = new Random();
@Override
public void run(SourceContext<TransactionEvent> ctx) throws Exception {
int eventId = 0;
while (running && eventId < 10000) {
String customerId = "customer-" + random.nextInt(100);
String merchantId = "merchant-" + random.nextInt(50);
double amount = 10 + random.nextDouble() * 990; // $10 to $1000
TransactionEvent event = new TransactionEvent(
"tx-" + eventId++,
customerId,
merchantId,
amount,
"USD",
Instant.now(),
TransactionType.PURCHASE
);
ctx.collect(event);
// Random delay between events
Thread.sleep(random.nextInt(100));
}
}
@Override
public void cancel() {
running = false;
}
}
/**
* Sink for customer profiles (to database, Kafka, etc.)
*/
public static class CustomerProfileSink implements SinkFunction<CustomerProfile> {
@Override
public void invoke(CustomerProfile profile, Context context) throws Exception {
// In production, write to database, Kafka, etc.
System.out.println("PROFILE UPDATE: " + profile.getCustomerId() +
" - Transactions: " + profile.getTransactionCount() +
" - Total: $" + profile.getTotalSpent());
}
}
}
7. Testing Stateful Functions
package com.flink.stateful.test;
import com.flink.stateful.functions.CustomerProfileFunction;
import com.flink.stateful.models.TransactionEvent;
import com.flink.stateful.models.TransactionType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import static org.junit.jupiter.api.Assertions.*;
class CustomerProfileFunctionTest {
private KeyedOneInputStreamOperatorTestHarness<String, TransactionEvent, String> testHarness;
private CustomerProfileFunction function;
@BeforeEach
void setUp() throws Exception {
function = new CustomerProfileFunction();
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
new KeyedProcessOperator<>(function),
TransactionEvent::getCustomerId,
TypeInformation.of(String.class)
);
testHarness.open();
}
@Test
void testCustomerProfileCreation() throws Exception {
TransactionEvent event = new TransactionEvent(
"tx-1", "customer-1", "merchant-1", 100.0, "USD",
Instant.now(), TransactionType.PURCHASE
);
testHarness.processElement(new StreamRecord<>(event));
// Check that state was updated
// Note: In real tests, you'd access state through the function
assertEquals(1, testHarness.getOutput().size());
}
@Test
void testMultipleTransactionsSameCustomer() throws Exception {
String customerId = "customer-1";
TransactionEvent event1 = new TransactionEvent(
"tx-1", customerId, "merchant-1", 100.0, "USD",
Instant.now(), TransactionType.PURCHASE
);
TransactionEvent event2 = new TransactionEvent(
"tx-2", customerId, "merchant-2", 200.0, "USD",
Instant.now().plusSeconds(10), TransactionType.PURCHASE
);
testHarness.processElement(new StreamRecord<>(event1));
testHarness.processElement(new StreamRecord<>(event2));
assertEquals(2, testHarness.getOutput().size());
}
@Test
void testDifferentCustomers() throws Exception {
TransactionEvent event1 = new TransactionEvent(
"tx-1", "customer-1", "merchant-1", 100.0, "USD",
Instant.now(), TransactionType.PURCHASE
);
TransactionEvent event2 = new TransactionEvent(
"tx-2", "customer-2", "merchant-1", 150.0, "USD",
Instant.now(), TransactionType.PURCHASE
);
testHarness.processElement(new StreamRecord<>(event1));
testHarness.processElement(new StreamRecord<>(event2));
assertEquals(2, testHarness.getOutput().size());
}
}
// Integration test with Testcontainers
class FraudDetectionIntegrationTest {
// This would use Testcontainers to spin up a mini Flink cluster
// and test the complete application
}
Best Practices for Flink Stateful Computations
1. State Management:
- Use appropriate state types (ValueState, ListState, MapState)
- Implement state TTL for automatic cleanup
- Monitor state size and growth patterns
- Use RocksDB for large state that doesn't fit in memory
2. Performance Optimization:
- Key by meaningful fields to distribute load
- Use efficient serialization (Kryo, Avro)
- Configure RocksDB options for your workload
- Monitor checkpointing performance
3. Fault Tolerance:
- Enable checkpointing with appropriate intervals
- Configure savepoints for planned maintenance
- Test failure recovery regularly
- Monitor checkpoint success rates
4. Monitoring and Observability:
// Custom metrics for stateful functions
class MonitoredStatefulFunction extends RichFlatMapFunction<TransactionEvent, FraudAlert> {
private transient Counter transactionCounter;
private transient Meter alertMeter;
private transient Histogram amountHistogram;
@Override
public void open(Configuration parameters) throws Exception {
transactionCounter = getRuntimeContext().getMetricGroup().counter("transactionsProcessed");
alertMeter = getRuntimeContext().getMetricGroup().meter("alertsGenerated", new MeterView(60));
amountHistogram = getRuntimeContext().getMetricGroup().histogram("transactionAmounts", new DescriptiveStatisticsHistogram(1000));
}
@Override
public void flatMap(TransactionEvent transaction, Collector<FraudAlert> out) throws Exception {
transactionCounter.inc();
amountHistogram.update((long) transaction.getAmount());
// Processing logic...
}
}
Conclusion
Flink stateful computations provide powerful capabilities for building sophisticated stream processing applications. Key benefits include:
- Complex Event Processing: Detect patterns across multiple events
- Real-time Analytics: Maintain and update aggregations continuously
- Sessionization: Group events into meaningful sessions
- Anomaly Detection: Identify outliers and suspicious patterns
Implementation Checklist:
- ✅ Choose appropriate state types for your use case
- ✅ Configure state TTL for automatic cleanup
- ✅ Set up proper checkpointing and state backend
- ✅ Implement efficient serialization
- ✅ Add comprehensive monitoring and metrics
- ✅ Write thorough unit and integration tests
- ✅ Plan for state migration and schema evolution
Stateful Flink applications are particularly well-suited for:
- Fraud detection systems
- Real-time recommendation engines
- IoT data processing pipelines
- Financial transaction monitoring
- User behavior analytics
By following these patterns and best practices, you can build robust, scalable stateful stream processing applications with Apache Flink that can handle complex business logic while maintaining high performance and reliability.