Mastering Complex Data Processing with Stream Patterns
Article
Pattern matching in streams enables sophisticated data processing by detecting sequences, relationships, and conditions within data flows. While Java doesn't have built-in pattern matching for streams like some functional languages, we can implement powerful pattern matching using Stream API combinations, collectors, and custom utilities.
Pattern Matching Concepts in Streams
Common Pattern Types:
- Sequence Patterns: Detecting ordered element sequences
- Condition Patterns: Finding elements that match specific conditions
- Grouping Patterns: Identifying patterns in grouped data
- Temporal Patterns: Time-based sequence detection
- Stateful Patterns: Patterns that depend on previous elements
1. Basic Pattern Matching Utilities
Let's start with foundational pattern matching utilities:
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
public class StreamPatternMatcher {
// 1. Find sequences of consecutive elements matching a predicate
public static <T> List<List<T>> findSequences(Stream<T> stream,
Predicate<T> predicate,
int minLength) {
return stream.collect(ArrayList::new, (lists, element) -> {
if (lists.isEmpty()) {
lists.add(new ArrayList<>());
}
List<T> currentList = lists.get(lists.size() - 1);
if (predicate.test(element)) {
currentList.add(element);
} else {
if (!currentList.isEmpty()) {
lists.add(new ArrayList<>());
}
}
}, ArrayList::addAll)
.stream()
.filter(list -> list.size() >= minLength)
.collect(Collectors.toList());
}
// 2. Find patterns where elements satisfy a sequence of predicates
@SafeVarargs
public static <T> List<List<T>> findPatternSequence(Stream<T> stream,
Predicate<T>... predicates) {
List<T> elements = stream.collect(Collectors.toList());
List<List<T>> results = new ArrayList<>();
for (int i = 0; i <= elements.size() - predicates.length; i++) {
boolean matches = true;
List<T> sequence = new ArrayList<>();
for (int j = 0; j < predicates.length && matches; j++) {
T element = elements.get(i + j);
if (predicates[j].test(element)) {
sequence.add(element);
} else {
matches = false;
}
}
if (matches) {
results.add(sequence);
}
}
return results;
}
// 3. Detect rising/falling sequences in numeric streams
public static <T extends Comparable<T>> List<List<T>> findMonotonicSequences(
Stream<T> stream, boolean rising, int minLength) {
List<T> elements = stream.collect(Collectors.toList());
List<List<T>> sequences = new ArrayList<>();
List<T> currentSequence = new ArrayList<>();
for (int i = 0; i < elements.size(); i++) {
T current = elements.get(i);
if (currentSequence.isEmpty()) {
currentSequence.add(current);
} else {
T previous = currentSequence.get(currentSequence.size() - 1);
int comparison = current.compareTo(previous);
boolean conditionMet = rising ? comparison > 0 : comparison < 0;
if (conditionMet) {
currentSequence.add(current);
} else {
if (currentSequence.size() >= minLength) {
sequences.add(new ArrayList<>(currentSequence));
}
currentSequence = new ArrayList<>();
currentSequence.add(current);
}
}
}
if (currentSequence.size() >= minLength) {
sequences.add(currentSequence);
}
return sequences;
}
// 4. Pattern matching with sliding window
public static <T> Stream<List<T>> slidingWindow(Stream<T> stream, int windowSize) {
List<T> elements = stream.collect(Collectors.toList());
return IntStream.range(0, elements.size() - windowSize + 1)
.mapToObj(i -> elements.subList(i, i + windowSize));
}
// 5. Find repeated patterns
public static <T> Map<List<T>, Long> findRepeatedPatterns(Stream<T> stream,
int patternLength,
long minOccurrences) {
List<T> elements = stream.collect(Collectors.toList());
return IntStream.range(0, elements.size() - patternLength + 1)
.mapToObj(i -> elements.subList(i, i + patternLength))
.collect(Collectors.groupingBy(pattern -> pattern, Collectors.counting()))
.entrySet().stream()
.filter(entry -> entry.getValue() >= minOccurrences)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
2. Advanced Pattern Matching Framework
Let's build a comprehensive pattern matching framework:
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
// Pattern matching result container
class PatternMatch<T> {
private final List<T> matchedElements;
private final int startIndex;
private final int endIndex;
private final Map<String, Object> metadata;
public PatternMatch(List<T> matchedElements, int startIndex, int endIndex) {
this.matchedElements = new ArrayList<>(matchedElements);
this.startIndex = startIndex;
this.endIndex = endIndex;
this.metadata = new HashMap<>();
}
public PatternMatch<T> withMetadata(String key, Object value) {
this.metadata.put(key, value);
return this;
}
// Getters
public List<T> getMatchedElements() { return matchedElements; }
public int getStartIndex() { return startIndex; }
public int getEndIndex() { return endIndex; }
public Map<String, Object> getMetadata() { return metadata; }
@Override
public String toString() {
return String.format("PatternMatch[%d-%d]: %s", startIndex, endIndex, matchedElements);
}
}
// Pattern specification interface
interface StreamPattern<T> {
Optional<PatternMatch<T>> match(List<T> elements, int startIndex);
String getDescription();
}
// Pattern matcher engine
public class AdvancedStreamPatternMatcher<T> {
private final List<StreamPattern<T>> patterns;
public AdvancedStreamPatternMatcher() {
this.patterns = new ArrayList<>();
}
public AdvancedStreamPatternMatcher<T> addPattern(StreamPattern<T> pattern) {
patterns.add(pattern);
return this;
}
public List<PatternMatch<T>> matchAll(Stream<T> stream) {
List<T> elements = stream.collect(Collectors.toList());
List<PatternMatch<T>> matches = new ArrayList<>();
for (int i = 0; i < elements.size(); i++) {
for (StreamPattern<T> pattern : patterns) {
Optional<PatternMatch<T>> match = pattern.match(elements, i);
match.ifPresent(matches::add);
}
}
return matches;
}
public <R> Stream<R> matchAndTransform(Stream<T> stream,
Function<PatternMatch<T>, R> transformer) {
return matchAll(stream).stream().map(transformer);
}
}
// Common pattern implementations
class SequencePattern<T> implements StreamPattern<T> {
private final List<Predicate<T>> sequencePredicates;
private final String description;
@SafeVarargs
public SequencePattern(Predicate<T>... predicates) {
this.sequencePredicates = Arrays.asList(predicates);
this.description = "Sequence of " + predicates.length + " elements";
}
@Override
public Optional<PatternMatch<T>> match(List<T> elements, int startIndex) {
if (startIndex + sequencePredicates.size() > elements.size()) {
return Optional.empty();
}
List<T> matched = new ArrayList<>();
for (int i = 0; i < sequencePredicates.size(); i++) {
T element = elements.get(startIndex + i);
if (sequencePredicates.get(i).test(element)) {
matched.add(element);
} else {
return Optional.empty();
}
}
PatternMatch<T> match = new PatternMatch<>(
matched, startIndex, startIndex + sequencePredicates.size() - 1);
return Optional.of(match);
}
@Override
public String getDescription() {
return description;
}
}
class RepetitionPattern<T> implements StreamPattern<T> {
private final Predicate<T> elementPredicate;
private final int minRepetitions;
private final int maxRepetitions;
public RepetitionPattern(Predicate<T> elementPredicate, int minRepetitions, int maxRepetitions) {
this.elementPredicate = elementPredicate;
this.minRepetitions = minRepetitions;
this.maxRepetitions = maxRepetitions;
}
@Override
public Optional<PatternMatch<T>> match(List<T> elements, int startIndex) {
List<T> matched = new ArrayList<>();
int count = 0;
for (int i = startIndex; i < elements.size() && count < maxRepetitions; i++) {
T element = elements.get(i);
if (elementPredicate.test(element)) {
matched.add(element);
count++;
} else {
break;
}
}
if (count >= minRepetitions) {
PatternMatch<T> match = new PatternMatch<>(
matched, startIndex, startIndex + count - 1);
match.withMetadata("repetitionCount", count);
return Optional.of(match);
}
return Optional.empty();
}
@Override
public String getDescription() {
return String.format("Repetition pattern: %d-%d repetitions", minRepetitions, maxRepetitions);
}
}
class RangePattern<T extends Comparable<T>> implements StreamPattern<T> {
private final T min;
private final T max;
private final int minLength;
public RangePattern(T min, T max, int minLength) {
this.min = min;
this.max = max;
this.minLength = minLength;
}
@Override
public Optional<PatternMatch<T>> match(List<T> elements, int startIndex) {
List<T> matched = new ArrayList<>();
for (int i = startIndex; i < elements.size(); i++) {
T element = elements.get(i);
if (element.compareTo(min) >= 0 && element.compareTo(max) <= 0) {
matched.add(element);
} else {
break;
}
}
if (matched.size() >= minLength) {
PatternMatch<T> match = new PatternMatch<>(
matched, startIndex, startIndex + matched.size() - 1);
match.withMetadata("averageValue", calculateAverage(matched));
return Optional.of(match);
}
return Optional.empty();
}
private double calculateAverage(List<T> elements) {
return elements.stream()
.mapToDouble(e -> ((Number) e).doubleValue())
.average()
.orElse(0.0);
}
@Override
public String getDescription() {
return String.format("Range pattern: %s to %s, min length %d", min, max, minLength);
}
}
3. Real-World Pattern Matching Examples
Financial Data Patterns:
import java.time.LocalDateTime;
// Financial data point
class StockTick {
private final String symbol;
private final double price;
private final double volume;
private final LocalDateTime timestamp;
public StockTick(String symbol, double price, double volume, LocalDateTime timestamp) {
this.symbol = symbol;
this.price = price;
this.volume = volume;
this.timestamp = timestamp;
}
// Getters
public String getSymbol() { return symbol; }
public double getPrice() { return price; }
public double getVolume() { return volume; }
public LocalDateTime getTimestamp() { return timestamp; }
@Override
public String toString() {
return String.format("%s: $%.2f (vol: %.0f)", symbol, price, volume);
}
}
public class FinancialPatternMatcher {
// 1. Detect price spikes
public static List<PatternMatch<StockTick>> findPriceSpikes(Stream<StockTick> ticks,
double spikeThreshold) {
AdvancedStreamPatternMatcher<StockTick> matcher = new AdvancedStreamPatternMatcher<>();
// Pattern: Significant price increase followed by stabilization
matcher.addPattern(new SequencePattern<>(
tick -> true, // Any tick to start
tick -> true // Placeholder for complex logic
));
return matcher.matchAll(ticks);
}
// 2. Find volume surges
public static List<PatternMatch<StockTick>> findVolumeSurges(Stream<StockTick> ticks,
double volumeMultiplier) {
List<StockTick> tickList = ticks.collect(Collectors.toList());
return IntStream.range(1, tickList.size())
.filter(i -> {
double currentVolume = tickList.get(i).getVolume();
double previousVolume = tickList.get(i - 1).getVolume();
return currentVolume > previousVolume * volumeMultiplier;
})
.mapToObj(i -> {
List<StockTick> match = Arrays.asList(tickList.get(i - 1), tickList.get(i));
return new PatternMatch<>(match, i - 1, i)
.withMetadata("volumeIncrease",
tickList.get(i).getVolume() / tickList.get(i - 1).getVolume());
})
.collect(Collectors.toList());
}
// 3. Detect support/resistance levels
public static Map<Double, Long> findSupportResistanceLevels(Stream<StockTick> ticks,
double priceGranularity) {
return ticks.collect(Collectors.groupingBy(
tick -> Math.round(tick.getPrice() / priceGranularity) * priceGranularity,
Collectors.counting()
)).entrySet().stream()
.filter(entry -> entry.getValue() > 1) // At least 2 touches
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
// 4. Pattern: Three rising peaks
public static List<List<StockTick>> findRisingPeaks(Stream<StockTick> ticks) {
List<StockTick> tickList = ticks.collect(Collectors.toList());
List<List<StockTick>> peaks = new ArrayList<>();
for (int i = 2; i < tickList.size() - 2; i++) {
StockTick prev2 = tickList.get(i - 2);
StockTick prev1 = tickList.get(i - 1);
StockTick current = tickList.get(i);
StockTick next1 = tickList.get(i + 1);
StockTick next2 = tickList.get(i + 2);
// Check if current is a peak and sequence is rising
boolean isPeak = current.getPrice() > prev1.getPrice() &&
current.getPrice() > next1.getPrice();
boolean isRising = prev2.getPrice() < prev1.getPrice() &&
prev1.getPrice() < current.getPrice();
boolean continuesRising = next1.getPrice() > next2.getPrice(); // Local maximum
if (isPeak && isRising && continuesRising) {
peaks.add(Arrays.asList(prev2, prev1, current, next1, next2));
}
}
return peaks;
}
}
Log Analysis Patterns:
import java.util.regex.Pattern;
class LogEntry {
private final String timestamp;
private final String level;
private final String message;
private final String source;
public LogEntry(String timestamp, String level, String message, String source) {
this.timestamp = timestamp;
this.level = level;
this.message = message;
this.source = source;
}
// Getters
public String getTimestamp() { return timestamp; }
public String getLevel() { return level; }
public String getMessage() { return message; }
public String getSource() { return source; }
public boolean isError() { return "ERROR".equals(level); }
public boolean isWarning() { return "WARN".equals(level); }
@Override
public String toString() {
return String.format("[%s] %s: %s", timestamp, level, message);
}
}
public class LogPatternMatcher {
// 1. Detect error cascades (multiple errors in short sequence)
public static List<PatternMatch<LogEntry>> findErrorCascades(Stream<LogEntry> logs,
int maxGapBetweenErrors) {
List<LogEntry> logList = logs.collect(Collectors.toList());
List<PatternMatch<LogEntry>> cascades = new ArrayList<>();
List<LogEntry> currentCascade = new ArrayList<>();
int lastErrorIndex = -1;
for (int i = 0; i < logList.size(); i++) {
LogEntry entry = logList.get(i);
if (entry.isError()) {
if (currentCascade.isEmpty() || (i - lastErrorIndex) <= maxGapBetweenErrors) {
currentCascade.add(entry);
} else {
if (currentCascade.size() >= 2) {
cascades.add(new PatternMatch<>(
new ArrayList<>(currentCascade),
i - currentCascade.size(),
i - 1
));
}
currentCascade = new ArrayList<>();
currentCascade.add(entry);
}
lastErrorIndex = i;
}
}
// Don't forget the last cascade
if (currentCascade.size() >= 2) {
cascades.add(new PatternMatch<>(
new ArrayList<>(currentCascade),
logList.size() - currentCascade.size(),
logList.size() - 1
));
}
return cascades;
}
// 2. Find patterns of warnings followed by errors
public static List<PatternMatch<LogEntry>> findWarningErrorPatterns(Stream<LogEntry> logs) {
AdvancedStreamPatternMatcher<LogEntry> matcher = new AdvancedStreamPatternMatcher<>();
matcher.addPattern(new SequencePattern<>(
LogEntry::isWarning,
LogEntry::isError
));
return matcher.matchAll(logs);
}
// 3. Detect repeated error messages
public static Map<String, Long> findRepeatedErrors(Stream<LogEntry> logs,
int minRepetitions) {
return logs.filter(LogEntry::isError)
.collect(Collectors.groupingBy(
LogEntry::getMessage,
Collectors.counting()
))
.entrySet().stream()
.filter(entry -> entry.getValue() >= minRepetitions)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
// 4. Pattern: Error -> Warning -> Error (sandwich pattern)
public static List<List<LogEntry>> findErrorSandwichPatterns(Stream<LogEntry> logs) {
List<LogEntry> logList = logs.collect(Collectors.toList());
List<List<LogEntry>> sandwiches = new ArrayList<>();
for (int i = 1; i < logList.size() - 1; i++) {
LogEntry prev = logList.get(i - 1);
LogEntry current = logList.get(i);
LogEntry next = logList.get(i + 1);
if (prev.isError() && current.isWarning() && next.isError()) {
sandwiches.add(Arrays.asList(prev, current, next));
}
}
return sandwiches;
}
}
4. Temporal Pattern Matching
import java.time.*;
import java.time.temporal.ChronoUnit;
import java.util.*;
class TemporalEvent {
private final String eventType;
private final LocalDateTime timestamp;
private final Map<String, Object> data;
public TemporalEvent(String eventType, LocalDateTime timestamp, Map<String, Object> data) {
this.eventType = eventType;
this.timestamp = timestamp;
this.data = new HashMap<>(data);
}
// Getters
public String getEventType() { return eventType; }
public LocalDateTime getTimestamp() { return timestamp; }
public Map<String, Object> getData() { return data; }
@Override
public String toString() {
return String.format("%s @ %s", eventType, timestamp);
}
}
public class TemporalPatternMatcher {
// 1. Find events occurring within a time window
public static List<PatternMatch<TemporalEvent>> findEventsInTimeWindow(
Stream<TemporalEvent> events, Duration windowSize, int minEvents) {
List<TemporalEvent> eventList = events
.sorted(Comparator.comparing(TemporalEvent::getTimestamp))
.collect(Collectors.toList());
List<PatternMatch<TemporalEvent>> matches = new ArrayList<>();
for (int i = 0; i < eventList.size(); i++) {
TemporalEvent startEvent = eventList.get(i);
LocalDateTime windowEnd = startEvent.getTimestamp().plus(windowSize);
List<TemporalEvent> windowEvents = new ArrayList<>();
windowEvents.add(startEvent);
for (int j = i + 1; j < eventList.size(); j++) {
TemporalEvent currentEvent = eventList.get(j);
if (currentEvent.getTimestamp().isBefore(windowEnd) ||
currentEvent.getTimestamp().equals(windowEnd)) {
windowEvents.add(currentEvent);
} else {
break;
}
}
if (windowEvents.size() >= minEvents) {
matches.add(new PatternMatch<>(
windowEvents, i, i + windowEvents.size() - 1
).withMetadata("windowDuration", windowSize));
}
}
return matches;
}
// 2. Detect event sequences with specific timing patterns
public static List<PatternMatch<TemporalEvent>> findTimedSequences(
Stream<TemporalEvent> events,
List<String> eventSequence,
Duration maxGapBetweenEvents) {
List<TemporalEvent> eventList = events
.sorted(Comparator.comparing(TemporalEvent::getTimestamp))
.collect(Collectors.toList());
List<PatternMatch<TemporalEvent>> matches = new ArrayList<>();
for (int i = 0; i <= eventList.size() - eventSequence.size(); i++) {
boolean sequenceMatches = true;
List<TemporalEvent> matchedEvents = new ArrayList<>();
for (int j = 0; j < eventSequence.size() && sequenceMatches; j++) {
TemporalEvent currentEvent = eventList.get(i + j);
// Check event type
if (!eventSequence.get(j).equals(currentEvent.getEventType())) {
sequenceMatches = false;
break;
}
// Check timing (except for first event)
if (j > 0) {
TemporalEvent previousEvent = matchedEvents.get(j - 1);
Duration gap = Duration.between(
previousEvent.getTimestamp(),
currentEvent.getTimestamp()
);
if (gap.compareTo(maxGapBetweenEvents) > 0) {
sequenceMatches = false;
break;
}
}
matchedEvents.add(currentEvent);
}
if (sequenceMatches) {
matches.add(new PatternMatch<>(
matchedEvents, i, i + eventSequence.size() - 1
));
}
}
return matches;
}
// 3. Find periodic event patterns
public static List<PatternMatch<TemporalEvent>> findPeriodicPatterns(
Stream<TemporalEvent> events,
String eventType,
Duration expectedPeriod,
Duration tolerance) {
List<TemporalEvent> eventList = events
.filter(e -> e.getEventType().equals(eventType))
.sorted(Comparator.comparing(TemporalEvent::getTimestamp))
.collect(Collectors.toList());
List<PatternMatch<TemporalEvent>> matches = new ArrayList<>();
List<TemporalEvent> currentPattern = new ArrayList<>();
for (int i = 1; i < eventList.size(); i++) {
TemporalEvent current = eventList.get(i);
TemporalEvent previous = eventList.get(i - 1);
Duration actualGap = Duration.between(previous.getTimestamp(), current.getTimestamp());
Duration gapDifference = actualGap.minus(expectedPeriod).abs();
if (gapDifference.compareTo(tolerance) <= 0) {
if (currentPattern.isEmpty()) {
currentPattern.add(previous);
}
currentPattern.add(current);
} else {
if (currentPattern.size() >= 3) { // At least 3 events for a pattern
matches.add(new PatternMatch<>(
new ArrayList<>(currentPattern),
eventList.indexOf(currentPattern.get(0)),
eventList.indexOf(currentPattern.get(currentPattern.size() - 1))
));
}
currentPattern.clear();
}
}
// Check last pattern
if (currentPattern.size() >= 3) {
matches.add(new PatternMatch<>(
new ArrayList<>(currentPattern),
eventList.indexOf(currentPattern.get(0)),
eventList.indexOf(currentPattern.get(currentPattern.size() - 1))
));
}
return matches;
}
}
5. Stateful Pattern Matching
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
// State machine for complex pattern matching
class PatternStateMachine<T> {
private final List<State<T>> states;
private final State<T> startState;
private final List<BiConsumer<List<T>, Map<String, Object>>> matchCallbacks;
public PatternStateMachine() {
this.states = new ArrayList<>();
this.matchCallbacks = new ArrayList<>();
this.startState = new State<>("start");
states.add(startState);
}
public State<T> addState(String name) {
State<T> state = new State<>(name);
states.add(state);
return state;
}
public void addMatchCallback(BiConsumer<List<T>, Map<String, Object>> callback) {
matchCallbacks.add(callback);
}
public List<PatternMatch<T>> match(Stream<T> stream) {
List<T> elements = stream.collect(Collectors.toList());
List<PatternMatch<T>> matches = new ArrayList<>();
Map<String, Object> context = new HashMap<>();
for (int startIdx = 0; startIdx < elements.size(); startIdx++) {
State<T> currentState = startState;
List<T> matchedElements = new ArrayList<>();
int currentIdx = startIdx;
while (currentIdx < elements.size() && currentState != null) {
T currentElement = elements.get(currentIdx);
State<T> nextState = currentState.transition(currentElement, context);
if (nextState != null) {
matchedElements.add(currentElement);
currentState = nextState;
currentIdx++;
// Check if we reached a final state
if (currentState.isFinal()) {
PatternMatch<T> match = new PatternMatch<>(
new ArrayList<>(matchedElements), startIdx, currentIdx - 1);
// Add context data
context.forEach(match::withMetadata);
matches.add(match);
// Notify callbacks
matchCallbacks.forEach(callback ->
callback.accept(matchedElements, context));
break;
}
} else {
break;
}
}
context.clear();
}
return matches;
}
public static class State<T> {
private final String name;
private final List<Transition<T>> transitions;
private boolean isFinal;
public State(String name) {
this.name = name;
this.transitions = new ArrayList<>();
this.isFinal = false;
}
public State<T> addTransition(Predicate<T> condition, State<T> target) {
return addTransition(condition, target, (e, ctx) -> {});
}
public State<T> addTransition(Predicate<T> condition, State<T> target,
BiConsumer<T, Map<String, Object>> action) {
transitions.add(new Transition<>(condition, target, action));
return this;
}
public State<T> markFinal() {
this.isFinal = true;
return this;
}
public State<T> transition(T element, Map<String, Object> context) {
for (Transition<T> transition : transitions) {
if (transition.condition.test(element)) {
transition.action.accept(element, context);
return transition.target;
}
}
return null;
}
public boolean isFinal() { return isFinal; }
public String getName() { return name; }
}
private static class Transition<T> {
final Predicate<T> condition;
final State<T> target;
final BiConsumer<T, Map<String, Object>> action;
Transition(Predicate<T> condition, State<T> target,
BiConsumer<T, Map<String, Object>> action) {
this.condition = condition;
this.target = target;
this.action = action;
}
}
}
// Example: Detect login failure patterns
public class SecurityPatternMatcher {
public static List<PatternMatch<SecurityEvent>> detectBruteForceAttempts(Stream<SecurityEvent> events) {
PatternStateMachine<SecurityEvent> fsm = new PatternStateMachine<>();
PatternStateMachine.State<SecurityEvent> start = fsm.startState;
PatternStateMachine.State<SecurityEvent> firstFailure = fsm.addState("firstFailure");
PatternStateMachine.State<SecurityEvent> secondFailure = fsm.addState("secondFailure");
PatternStateMachine.State<SecurityEvent> thirdFailure = fsm.addState("thirdFailure").markFinal();
// Transitions
start.addTransition(
e -> "LOGIN_FAILURE".equals(e.getType()) && e.getUser() != null,
firstFailure,
(e, ctx) -> ctx.put("user", e.getUser())
);
firstFailure.addTransition(
e -> "LOGIN_FAILURE".equals(e.getType()) &&
e.getUser().equals(ctx.get("user")) &&
isWithinTimeWindow(e, (SecurityEvent) ctx.get("lastEvent"), Duration.ofMinutes(5)),
secondFailure,
(e, ctx) -> ctx.put("lastEvent", e)
);
secondFailure.addTransition(
e -> "LOGIN_FAILURE".equals(e.getType()) &&
e.getUser().equals(ctx.get("user")) &&
isWithinTimeWindow(e, (SecurityEvent) ctx.get("lastEvent"), Duration.ofMinutes(5)),
thirdFailure,
(e, ctx) -> ctx.put("bruteForceDetected", true)
);
return fsm.match(events);
}
private static boolean isWithinTimeWindow(SecurityEvent current, SecurityEvent previous, Duration window) {
if (previous == null) return true;
return Duration.between(previous.getTimestamp(), current.getTimestamp()).compareTo(window) <= 0;
}
}
class SecurityEvent {
private final String type;
private final String user;
private final LocalDateTime timestamp;
private final String sourceIp;
public SecurityEvent(String type, String user, LocalDateTime timestamp, String sourceIp) {
this.type = type;
this.user = user;
this.timestamp = timestamp;
this.sourceIp = sourceIp;
}
// Getters
public String getType() { return type; }
public String getUser() { return user; }
public LocalDateTime getTimestamp() { return timestamp; }
public String getSourceIp() { return sourceIp; }
}
6. Performance-Optimized Pattern Matching
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class ParallelPatternMatcher {
// Parallel pattern matching for large datasets
public static <T> List<PatternMatch<T>> parallelMatch(Stream<T> stream,
StreamPattern<T> pattern,
int chunkSize) {
List<T> elements = stream.collect(Collectors.toList());
// Split into chunks for parallel processing
List<List<T>> chunks = splitIntoChunks(elements, chunkSize);
return chunks.parallelStream()
.flatMap(chunk -> {
List<PatternMatch<T>> chunkMatches = new ArrayList<>();
for (int i = 0; i < chunk.size(); i++) {
pattern.match(chunk, i).ifPresent(chunkMatches::add);
}
return chunkMatches.stream();
})
.collect(Collectors.toList());
}
private static <T> List<List<T>> splitIntoChunks(List<T> elements, int chunkSize) {
List<List<T>> chunks = new ArrayList<>();
for (int i = 0; i < elements.size(); i += chunkSize) {
int end = Math.min(i + chunkSize, elements.size());
chunks.add(elements.subList(i, end));
}
return chunks;
}
// Bloom filter for fast pattern pre-screening
public static class PatternBloomFilter<T> {
private final BitSet bitSet;
private final int size;
private final List<Function<T, Integer>> hashFunctions;
public PatternBloomFilter(int size, int numHashFunctions) {
this.bitSet = new BitSet(size);
this.size = size;
this.hashFunctions = new ArrayList<>();
Random random = new Random(42); // Fixed seed for consistency
for (int i = 0; i < numHashFunctions; i++) {
final int seed = random.nextInt();
hashFunctions.add(element -> Math.abs((element.hashCode() ^ seed) % size));
}
}
public void add(T element) {
for (Function<T, Integer> hashFunction : hashFunctions) {
int index = hashFunction.apply(element);
bitSet.set(index);
}
}
public boolean mightContain(T element) {
for (Function<T, Integer> hashFunction : hashFunctions) {
int index = hashFunction.apply(element);
if (!bitSet.get(index)) {
return false;
}
}
return true;
}
public <T> Stream<T> preFilter(Stream<T> stream, Predicate<T> patternPredicate) {
return stream.filter(element -> {
if (!mightContain(element)) {
return false; // Definitely not matching
}
// Might match, need full verification
return patternPredicate.test(element);
});
}
}
}
// Caching pattern matcher for repeated patterns
public class CachingPatternMatcher<T> {
private final Map<String, List<PatternMatch<T>>> cache;
private final AdvancedStreamPatternMatcher<T> matcher;
public CachingPatternMatcher() {
this.cache = new ConcurrentHashMap<>();
this.matcher = new AdvancedStreamPatternMatcher<>();
}
public List<PatternMatch<T>> matchCached(Stream<T> stream, String patternId,
Supplier<StreamPattern<T>> patternSupplier) {
return cache.computeIfAbsent(patternId, id -> {
StreamPattern<T> pattern = patternSupplier.get();
matcher.addPattern(pattern);
List<T> elements = stream.collect(Collectors.toList());
return matcher.matchAll(elements.stream());
});
}
public void clearCache() {
cache.clear();
}
public void invalidatePattern(String patternId) {
cache.remove(patternId);
}
}
7. Testing Pattern Matching
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.stream.*;
import static org.junit.jupiter.api.Assertions.*;
class StreamPatternMatchingTest {
@Test
void testSequencePatternMatching() {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 2, 3, 4, 1, 2, 3);
AdvancedStreamPatternMatcher<Integer> matcher = new AdvancedStreamPatternMatcher<>();
matcher.addPattern(new SequencePattern<>(
n -> n == 2,
n -> n == 3,
n -> n == 4
));
List<PatternMatch<Integer>> matches = matcher.matchAll(numbers.stream());
assertEquals(2, matches.size());
assertEquals(Arrays.asList(2, 3, 4), matches.get(0).getMatchedElements());
assertEquals(Arrays.asList(2, 3, 4), matches.get(1).getMatchedElements());
}
@Test
void testRepetitionPattern() {
List<String> words = Arrays.asList("a", "a", "a", "b", "c", "a", "a", "d");
RepetitionPattern<String> pattern = new RepetitionPattern<>(
word -> word.equals("a"), 2, 5
);
AdvancedStreamPatternMatcher<String> matcher = new AdvancedStreamPatternMatcher<>();
matcher.addPattern(pattern);
List<PatternMatch<String>> matches = matcher.matchAll(words.stream());
assertEquals(2, matches.size());
assertEquals(3, matches.get(0).getMatchedElements().size()); // "a", "a", "a"
assertEquals(2, matches.get(1).getMatchedElements().size()); // "a", "a"
}
@Test
void testFinancialPatterns() {
List<StockTick> ticks = Arrays.asList(
new StockTick("AAPL", 150.0, 1000, LocalDateTime.now()),
new StockTick("AAPL", 152.0, 1200, LocalDateTime.now().plusMinutes(1)),
new StockTick("AAPL", 151.0, 800, LocalDateTime.now().plusMinutes(2)),
new StockTick("AAPL", 153.0, 1500, LocalDateTime.now().plusMinutes(3))
);
List<PatternMatch<StockTick>> surges = FinancialPatternMatcher.findVolumeSurges(
ticks.stream(), 1.5
);
assertFalse(surges.isEmpty());
}
}
Best Practices for Stream Pattern Matching
1. Performance Considerations:
- Use parallel streams for large datasets
- Implement caching for repeated patterns
- Consider bloom filters for pre-screening
- Optimize predicate complexity
2. Memory Management:
- Process streams sequentially when possible
- Use primitive streams for numeric patterns
- Implement custom collectors for complex patterns
- Consider external memory patterns for very large datasets
3. Pattern Design:
- Keep patterns focused and specific
- Use composition to build complex patterns
- Implement pattern validation
- Document pattern semantics clearly
4. Error Handling:
public class SafePatternMatcher<T> {
public static <T> List<PatternMatch<T>> safeMatch(Stream<T> stream,
StreamPattern<T> pattern) {
try {
AdvancedStreamPatternMatcher<T> matcher = new AdvancedStreamPatternMatcher<>();
matcher.addPattern(pattern);
return matcher.matchAll(stream);
} catch (Exception e) {
System.err.println("Pattern matching failed: " + e.getMessage());
return Collections.emptyList();
}
}
}
Conclusion
Pattern matching in Java streams enables sophisticated data analysis and processing capabilities. Key takeaways:
- Sequence Detection: Identify ordered patterns in data streams
- Condition Patterns: Find elements matching complex conditions
- Temporal Patterns: Detect time-based sequences and relationships
- Stateful Patterns: Implement complex state machines for pattern recognition
- Performance Optimization: Use parallel processing and caching for large datasets
Implementation Checklist:
- ✅ Define clear pattern specifications
- ✅ Implement appropriate pattern matching algorithms
- ✅ Consider performance and memory requirements
- ✅ Add comprehensive testing
- ✅ Implement proper error handling
- ✅ Consider parallel processing for large datasets
- ✅ Add monitoring and metrics
Pattern matching in streams is particularly valuable for:
- Financial analysis (trading patterns, anomalies)
- Log analysis (error patterns, security threats)
- IoT data processing (sensor patterns, predictive maintenance)
- Network monitoring (intrusion detection, performance issues)
- Business intelligence (customer behavior patterns, trends)
By mastering these pattern matching techniques, you can extract valuable insights from streaming data and build sophisticated real-time data processing applications.