Advanced Pattern Matching in Java Streams

Mastering Complex Data Processing with Stream Patterns


Article

Pattern matching in streams enables sophisticated data processing by detecting sequences, relationships, and conditions within data flows. While Java doesn't have built-in pattern matching for streams like some functional languages, we can implement powerful pattern matching using Stream API combinations, collectors, and custom utilities.

Pattern Matching Concepts in Streams

Common Pattern Types:

  • Sequence Patterns: Detecting ordered element sequences
  • Condition Patterns: Finding elements that match specific conditions
  • Grouping Patterns: Identifying patterns in grouped data
  • Temporal Patterns: Time-based sequence detection
  • Stateful Patterns: Patterns that depend on previous elements

1. Basic Pattern Matching Utilities

Let's start with foundational pattern matching utilities:

import java.util.*;
import java.util.function.*;
import java.util.stream.*;
public class StreamPatternMatcher {
// 1. Find sequences of consecutive elements matching a predicate
public static <T> List<List<T>> findSequences(Stream<T> stream, 
Predicate<T> predicate, 
int minLength) {
return stream.collect(ArrayList::new, (lists, element) -> {
if (lists.isEmpty()) {
lists.add(new ArrayList<>());
}
List<T> currentList = lists.get(lists.size() - 1);
if (predicate.test(element)) {
currentList.add(element);
} else {
if (!currentList.isEmpty()) {
lists.add(new ArrayList<>());
}
}
}, ArrayList::addAll)
.stream()
.filter(list -> list.size() >= minLength)
.collect(Collectors.toList());
}
// 2. Find patterns where elements satisfy a sequence of predicates
@SafeVarargs
public static <T> List<List<T>> findPatternSequence(Stream<T> stream, 
Predicate<T>... predicates) {
List<T> elements = stream.collect(Collectors.toList());
List<List<T>> results = new ArrayList<>();
for (int i = 0; i <= elements.size() - predicates.length; i++) {
boolean matches = true;
List<T> sequence = new ArrayList<>();
for (int j = 0; j < predicates.length && matches; j++) {
T element = elements.get(i + j);
if (predicates[j].test(element)) {
sequence.add(element);
} else {
matches = false;
}
}
if (matches) {
results.add(sequence);
}
}
return results;
}
// 3. Detect rising/falling sequences in numeric streams
public static <T extends Comparable<T>> List<List<T>> findMonotonicSequences(
Stream<T> stream, boolean rising, int minLength) {
List<T> elements = stream.collect(Collectors.toList());
List<List<T>> sequences = new ArrayList<>();
List<T> currentSequence = new ArrayList<>();
for (int i = 0; i < elements.size(); i++) {
T current = elements.get(i);
if (currentSequence.isEmpty()) {
currentSequence.add(current);
} else {
T previous = currentSequence.get(currentSequence.size() - 1);
int comparison = current.compareTo(previous);
boolean conditionMet = rising ? comparison > 0 : comparison < 0;
if (conditionMet) {
currentSequence.add(current);
} else {
if (currentSequence.size() >= minLength) {
sequences.add(new ArrayList<>(currentSequence));
}
currentSequence = new ArrayList<>();
currentSequence.add(current);
}
}
}
if (currentSequence.size() >= minLength) {
sequences.add(currentSequence);
}
return sequences;
}
// 4. Pattern matching with sliding window
public static <T> Stream<List<T>> slidingWindow(Stream<T> stream, int windowSize) {
List<T> elements = stream.collect(Collectors.toList());
return IntStream.range(0, elements.size() - windowSize + 1)
.mapToObj(i -> elements.subList(i, i + windowSize));
}
// 5. Find repeated patterns
public static <T> Map<List<T>, Long> findRepeatedPatterns(Stream<T> stream, 
int patternLength, 
long minOccurrences) {
List<T> elements = stream.collect(Collectors.toList());
return IntStream.range(0, elements.size() - patternLength + 1)
.mapToObj(i -> elements.subList(i, i + patternLength))
.collect(Collectors.groupingBy(pattern -> pattern, Collectors.counting()))
.entrySet().stream()
.filter(entry -> entry.getValue() >= minOccurrences)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}

2. Advanced Pattern Matching Framework

Let's build a comprehensive pattern matching framework:

import java.util.*;
import java.util.function.*;
import java.util.stream.*;
// Pattern matching result container
class PatternMatch<T> {
private final List<T> matchedElements;
private final int startIndex;
private final int endIndex;
private final Map<String, Object> metadata;
public PatternMatch(List<T> matchedElements, int startIndex, int endIndex) {
this.matchedElements = new ArrayList<>(matchedElements);
this.startIndex = startIndex;
this.endIndex = endIndex;
this.metadata = new HashMap<>();
}
public PatternMatch<T> withMetadata(String key, Object value) {
this.metadata.put(key, value);
return this;
}
// Getters
public List<T> getMatchedElements() { return matchedElements; }
public int getStartIndex() { return startIndex; }
public int getEndIndex() { return endIndex; }
public Map<String, Object> getMetadata() { return metadata; }
@Override
public String toString() {
return String.format("PatternMatch[%d-%d]: %s", startIndex, endIndex, matchedElements);
}
}
// Pattern specification interface
interface StreamPattern<T> {
Optional<PatternMatch<T>> match(List<T> elements, int startIndex);
String getDescription();
}
// Pattern matcher engine
public class AdvancedStreamPatternMatcher<T> {
private final List<StreamPattern<T>> patterns;
public AdvancedStreamPatternMatcher() {
this.patterns = new ArrayList<>();
}
public AdvancedStreamPatternMatcher<T> addPattern(StreamPattern<T> pattern) {
patterns.add(pattern);
return this;
}
public List<PatternMatch<T>> matchAll(Stream<T> stream) {
List<T> elements = stream.collect(Collectors.toList());
List<PatternMatch<T>> matches = new ArrayList<>();
for (int i = 0; i < elements.size(); i++) {
for (StreamPattern<T> pattern : patterns) {
Optional<PatternMatch<T>> match = pattern.match(elements, i);
match.ifPresent(matches::add);
}
}
return matches;
}
public <R> Stream<R> matchAndTransform(Stream<T> stream, 
Function<PatternMatch<T>, R> transformer) {
return matchAll(stream).stream().map(transformer);
}
}
// Common pattern implementations
class SequencePattern<T> implements StreamPattern<T> {
private final List<Predicate<T>> sequencePredicates;
private final String description;
@SafeVarargs
public SequencePattern(Predicate<T>... predicates) {
this.sequencePredicates = Arrays.asList(predicates);
this.description = "Sequence of " + predicates.length + " elements";
}
@Override
public Optional<PatternMatch<T>> match(List<T> elements, int startIndex) {
if (startIndex + sequencePredicates.size() > elements.size()) {
return Optional.empty();
}
List<T> matched = new ArrayList<>();
for (int i = 0; i < sequencePredicates.size(); i++) {
T element = elements.get(startIndex + i);
if (sequencePredicates.get(i).test(element)) {
matched.add(element);
} else {
return Optional.empty();
}
}
PatternMatch<T> match = new PatternMatch<>(
matched, startIndex, startIndex + sequencePredicates.size() - 1);
return Optional.of(match);
}
@Override
public String getDescription() {
return description;
}
}
class RepetitionPattern<T> implements StreamPattern<T> {
private final Predicate<T> elementPredicate;
private final int minRepetitions;
private final int maxRepetitions;
public RepetitionPattern(Predicate<T> elementPredicate, int minRepetitions, int maxRepetitions) {
this.elementPredicate = elementPredicate;
this.minRepetitions = minRepetitions;
this.maxRepetitions = maxRepetitions;
}
@Override
public Optional<PatternMatch<T>> match(List<T> elements, int startIndex) {
List<T> matched = new ArrayList<>();
int count = 0;
for (int i = startIndex; i < elements.size() && count < maxRepetitions; i++) {
T element = elements.get(i);
if (elementPredicate.test(element)) {
matched.add(element);
count++;
} else {
break;
}
}
if (count >= minRepetitions) {
PatternMatch<T> match = new PatternMatch<>(
matched, startIndex, startIndex + count - 1);
match.withMetadata("repetitionCount", count);
return Optional.of(match);
}
return Optional.empty();
}
@Override
public String getDescription() {
return String.format("Repetition pattern: %d-%d repetitions", minRepetitions, maxRepetitions);
}
}
class RangePattern<T extends Comparable<T>> implements StreamPattern<T> {
private final T min;
private final T max;
private final int minLength;
public RangePattern(T min, T max, int minLength) {
this.min = min;
this.max = max;
this.minLength = minLength;
}
@Override
public Optional<PatternMatch<T>> match(List<T> elements, int startIndex) {
List<T> matched = new ArrayList<>();
for (int i = startIndex; i < elements.size(); i++) {
T element = elements.get(i);
if (element.compareTo(min) >= 0 && element.compareTo(max) <= 0) {
matched.add(element);
} else {
break;
}
}
if (matched.size() >= minLength) {
PatternMatch<T> match = new PatternMatch<>(
matched, startIndex, startIndex + matched.size() - 1);
match.withMetadata("averageValue", calculateAverage(matched));
return Optional.of(match);
}
return Optional.empty();
}
private double calculateAverage(List<T> elements) {
return elements.stream()
.mapToDouble(e -> ((Number) e).doubleValue())
.average()
.orElse(0.0);
}
@Override
public String getDescription() {
return String.format("Range pattern: %s to %s, min length %d", min, max, minLength);
}
}

3. Real-World Pattern Matching Examples

Financial Data Patterns:

import java.time.LocalDateTime;
// Financial data point
class StockTick {
private final String symbol;
private final double price;
private final double volume;
private final LocalDateTime timestamp;
public StockTick(String symbol, double price, double volume, LocalDateTime timestamp) {
this.symbol = symbol;
this.price = price;
this.volume = volume;
this.timestamp = timestamp;
}
// Getters
public String getSymbol() { return symbol; }
public double getPrice() { return price; }
public double getVolume() { return volume; }
public LocalDateTime getTimestamp() { return timestamp; }
@Override
public String toString() {
return String.format("%s: $%.2f (vol: %.0f)", symbol, price, volume);
}
}
public class FinancialPatternMatcher {
// 1. Detect price spikes
public static List<PatternMatch<StockTick>> findPriceSpikes(Stream<StockTick> ticks, 
double spikeThreshold) {
AdvancedStreamPatternMatcher<StockTick> matcher = new AdvancedStreamPatternMatcher<>();
// Pattern: Significant price increase followed by stabilization
matcher.addPattern(new SequencePattern<>(
tick -> true, // Any tick to start
tick -> true  // Placeholder for complex logic
));
return matcher.matchAll(ticks);
}
// 2. Find volume surges
public static List<PatternMatch<StockTick>> findVolumeSurges(Stream<StockTick> ticks, 
double volumeMultiplier) {
List<StockTick> tickList = ticks.collect(Collectors.toList());
return IntStream.range(1, tickList.size())
.filter(i -> {
double currentVolume = tickList.get(i).getVolume();
double previousVolume = tickList.get(i - 1).getVolume();
return currentVolume > previousVolume * volumeMultiplier;
})
.mapToObj(i -> {
List<StockTick> match = Arrays.asList(tickList.get(i - 1), tickList.get(i));
return new PatternMatch<>(match, i - 1, i)
.withMetadata("volumeIncrease", 
tickList.get(i).getVolume() / tickList.get(i - 1).getVolume());
})
.collect(Collectors.toList());
}
// 3. Detect support/resistance levels
public static Map<Double, Long> findSupportResistanceLevels(Stream<StockTick> ticks, 
double priceGranularity) {
return ticks.collect(Collectors.groupingBy(
tick -> Math.round(tick.getPrice() / priceGranularity) * priceGranularity,
Collectors.counting()
)).entrySet().stream()
.filter(entry -> entry.getValue() > 1) // At least 2 touches
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
// 4. Pattern: Three rising peaks
public static List<List<StockTick>> findRisingPeaks(Stream<StockTick> ticks) {
List<StockTick> tickList = ticks.collect(Collectors.toList());
List<List<StockTick>> peaks = new ArrayList<>();
for (int i = 2; i < tickList.size() - 2; i++) {
StockTick prev2 = tickList.get(i - 2);
StockTick prev1 = tickList.get(i - 1);
StockTick current = tickList.get(i);
StockTick next1 = tickList.get(i + 1);
StockTick next2 = tickList.get(i + 2);
// Check if current is a peak and sequence is rising
boolean isPeak = current.getPrice() > prev1.getPrice() && 
current.getPrice() > next1.getPrice();
boolean isRising = prev2.getPrice() < prev1.getPrice() && 
prev1.getPrice() < current.getPrice();
boolean continuesRising = next1.getPrice() > next2.getPrice(); // Local maximum
if (isPeak && isRising && continuesRising) {
peaks.add(Arrays.asList(prev2, prev1, current, next1, next2));
}
}
return peaks;
}
}

Log Analysis Patterns:

import java.util.regex.Pattern;
class LogEntry {
private final String timestamp;
private final String level;
private final String message;
private final String source;
public LogEntry(String timestamp, String level, String message, String source) {
this.timestamp = timestamp;
this.level = level;
this.message = message;
this.source = source;
}
// Getters
public String getTimestamp() { return timestamp; }
public String getLevel() { return level; }
public String getMessage() { return message; }
public String getSource() { return source; }
public boolean isError() { return "ERROR".equals(level); }
public boolean isWarning() { return "WARN".equals(level); }
@Override
public String toString() {
return String.format("[%s] %s: %s", timestamp, level, message);
}
}
public class LogPatternMatcher {
// 1. Detect error cascades (multiple errors in short sequence)
public static List<PatternMatch<LogEntry>> findErrorCascades(Stream<LogEntry> logs, 
int maxGapBetweenErrors) {
List<LogEntry> logList = logs.collect(Collectors.toList());
List<PatternMatch<LogEntry>> cascades = new ArrayList<>();
List<LogEntry> currentCascade = new ArrayList<>();
int lastErrorIndex = -1;
for (int i = 0; i < logList.size(); i++) {
LogEntry entry = logList.get(i);
if (entry.isError()) {
if (currentCascade.isEmpty() || (i - lastErrorIndex) <= maxGapBetweenErrors) {
currentCascade.add(entry);
} else {
if (currentCascade.size() >= 2) {
cascades.add(new PatternMatch<>(
new ArrayList<>(currentCascade), 
i - currentCascade.size(), 
i - 1
));
}
currentCascade = new ArrayList<>();
currentCascade.add(entry);
}
lastErrorIndex = i;
}
}
// Don't forget the last cascade
if (currentCascade.size() >= 2) {
cascades.add(new PatternMatch<>(
new ArrayList<>(currentCascade),
logList.size() - currentCascade.size(),
logList.size() - 1
));
}
return cascades;
}
// 2. Find patterns of warnings followed by errors
public static List<PatternMatch<LogEntry>> findWarningErrorPatterns(Stream<LogEntry> logs) {
AdvancedStreamPatternMatcher<LogEntry> matcher = new AdvancedStreamPatternMatcher<>();
matcher.addPattern(new SequencePattern<>(
LogEntry::isWarning,
LogEntry::isError
));
return matcher.matchAll(logs);
}
// 3. Detect repeated error messages
public static Map<String, Long> findRepeatedErrors(Stream<LogEntry> logs, 
int minRepetitions) {
return logs.filter(LogEntry::isError)
.collect(Collectors.groupingBy(
LogEntry::getMessage,
Collectors.counting()
))
.entrySet().stream()
.filter(entry -> entry.getValue() >= minRepetitions)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
// 4. Pattern: Error -> Warning -> Error (sandwich pattern)
public static List<List<LogEntry>> findErrorSandwichPatterns(Stream<LogEntry> logs) {
List<LogEntry> logList = logs.collect(Collectors.toList());
List<List<LogEntry>> sandwiches = new ArrayList<>();
for (int i = 1; i < logList.size() - 1; i++) {
LogEntry prev = logList.get(i - 1);
LogEntry current = logList.get(i);
LogEntry next = logList.get(i + 1);
if (prev.isError() && current.isWarning() && next.isError()) {
sandwiches.add(Arrays.asList(prev, current, next));
}
}
return sandwiches;
}
}

4. Temporal Pattern Matching

import java.time.*;
import java.time.temporal.ChronoUnit;
import java.util.*;
class TemporalEvent {
private final String eventType;
private final LocalDateTime timestamp;
private final Map<String, Object> data;
public TemporalEvent(String eventType, LocalDateTime timestamp, Map<String, Object> data) {
this.eventType = eventType;
this.timestamp = timestamp;
this.data = new HashMap<>(data);
}
// Getters
public String getEventType() { return eventType; }
public LocalDateTime getTimestamp() { return timestamp; }
public Map<String, Object> getData() { return data; }
@Override
public String toString() {
return String.format("%s @ %s", eventType, timestamp);
}
}
public class TemporalPatternMatcher {
// 1. Find events occurring within a time window
public static List<PatternMatch<TemporalEvent>> findEventsInTimeWindow(
Stream<TemporalEvent> events, Duration windowSize, int minEvents) {
List<TemporalEvent> eventList = events
.sorted(Comparator.comparing(TemporalEvent::getTimestamp))
.collect(Collectors.toList());
List<PatternMatch<TemporalEvent>> matches = new ArrayList<>();
for (int i = 0; i < eventList.size(); i++) {
TemporalEvent startEvent = eventList.get(i);
LocalDateTime windowEnd = startEvent.getTimestamp().plus(windowSize);
List<TemporalEvent> windowEvents = new ArrayList<>();
windowEvents.add(startEvent);
for (int j = i + 1; j < eventList.size(); j++) {
TemporalEvent currentEvent = eventList.get(j);
if (currentEvent.getTimestamp().isBefore(windowEnd) || 
currentEvent.getTimestamp().equals(windowEnd)) {
windowEvents.add(currentEvent);
} else {
break;
}
}
if (windowEvents.size() >= minEvents) {
matches.add(new PatternMatch<>(
windowEvents, i, i + windowEvents.size() - 1
).withMetadata("windowDuration", windowSize));
}
}
return matches;
}
// 2. Detect event sequences with specific timing patterns
public static List<PatternMatch<TemporalEvent>> findTimedSequences(
Stream<TemporalEvent> events, 
List<String> eventSequence,
Duration maxGapBetweenEvents) {
List<TemporalEvent> eventList = events
.sorted(Comparator.comparing(TemporalEvent::getTimestamp))
.collect(Collectors.toList());
List<PatternMatch<TemporalEvent>> matches = new ArrayList<>();
for (int i = 0; i <= eventList.size() - eventSequence.size(); i++) {
boolean sequenceMatches = true;
List<TemporalEvent> matchedEvents = new ArrayList<>();
for (int j = 0; j < eventSequence.size() && sequenceMatches; j++) {
TemporalEvent currentEvent = eventList.get(i + j);
// Check event type
if (!eventSequence.get(j).equals(currentEvent.getEventType())) {
sequenceMatches = false;
break;
}
// Check timing (except for first event)
if (j > 0) {
TemporalEvent previousEvent = matchedEvents.get(j - 1);
Duration gap = Duration.between(
previousEvent.getTimestamp(), 
currentEvent.getTimestamp()
);
if (gap.compareTo(maxGapBetweenEvents) > 0) {
sequenceMatches = false;
break;
}
}
matchedEvents.add(currentEvent);
}
if (sequenceMatches) {
matches.add(new PatternMatch<>(
matchedEvents, i, i + eventSequence.size() - 1
));
}
}
return matches;
}
// 3. Find periodic event patterns
public static List<PatternMatch<TemporalEvent>> findPeriodicPatterns(
Stream<TemporalEvent> events, 
String eventType,
Duration expectedPeriod,
Duration tolerance) {
List<TemporalEvent> eventList = events
.filter(e -> e.getEventType().equals(eventType))
.sorted(Comparator.comparing(TemporalEvent::getTimestamp))
.collect(Collectors.toList());
List<PatternMatch<TemporalEvent>> matches = new ArrayList<>();
List<TemporalEvent> currentPattern = new ArrayList<>();
for (int i = 1; i < eventList.size(); i++) {
TemporalEvent current = eventList.get(i);
TemporalEvent previous = eventList.get(i - 1);
Duration actualGap = Duration.between(previous.getTimestamp(), current.getTimestamp());
Duration gapDifference = actualGap.minus(expectedPeriod).abs();
if (gapDifference.compareTo(tolerance) <= 0) {
if (currentPattern.isEmpty()) {
currentPattern.add(previous);
}
currentPattern.add(current);
} else {
if (currentPattern.size() >= 3) { // At least 3 events for a pattern
matches.add(new PatternMatch<>(
new ArrayList<>(currentPattern),
eventList.indexOf(currentPattern.get(0)),
eventList.indexOf(currentPattern.get(currentPattern.size() - 1))
));
}
currentPattern.clear();
}
}
// Check last pattern
if (currentPattern.size() >= 3) {
matches.add(new PatternMatch<>(
new ArrayList<>(currentPattern),
eventList.indexOf(currentPattern.get(0)),
eventList.indexOf(currentPattern.get(currentPattern.size() - 1))
));
}
return matches;
}
}

5. Stateful Pattern Matching

import java.util.*;
import java.util.function.*;
import java.util.stream.*;
// State machine for complex pattern matching
class PatternStateMachine<T> {
private final List<State<T>> states;
private final State<T> startState;
private final List<BiConsumer<List<T>, Map<String, Object>>> matchCallbacks;
public PatternStateMachine() {
this.states = new ArrayList<>();
this.matchCallbacks = new ArrayList<>();
this.startState = new State<>("start");
states.add(startState);
}
public State<T> addState(String name) {
State<T> state = new State<>(name);
states.add(state);
return state;
}
public void addMatchCallback(BiConsumer<List<T>, Map<String, Object>> callback) {
matchCallbacks.add(callback);
}
public List<PatternMatch<T>> match(Stream<T> stream) {
List<T> elements = stream.collect(Collectors.toList());
List<PatternMatch<T>> matches = new ArrayList<>();
Map<String, Object> context = new HashMap<>();
for (int startIdx = 0; startIdx < elements.size(); startIdx++) {
State<T> currentState = startState;
List<T> matchedElements = new ArrayList<>();
int currentIdx = startIdx;
while (currentIdx < elements.size() && currentState != null) {
T currentElement = elements.get(currentIdx);
State<T> nextState = currentState.transition(currentElement, context);
if (nextState != null) {
matchedElements.add(currentElement);
currentState = nextState;
currentIdx++;
// Check if we reached a final state
if (currentState.isFinal()) {
PatternMatch<T> match = new PatternMatch<>(
new ArrayList<>(matchedElements), startIdx, currentIdx - 1);
// Add context data
context.forEach(match::withMetadata);
matches.add(match);
// Notify callbacks
matchCallbacks.forEach(callback -> 
callback.accept(matchedElements, context));
break;
}
} else {
break;
}
}
context.clear();
}
return matches;
}
public static class State<T> {
private final String name;
private final List<Transition<T>> transitions;
private boolean isFinal;
public State(String name) {
this.name = name;
this.transitions = new ArrayList<>();
this.isFinal = false;
}
public State<T> addTransition(Predicate<T> condition, State<T> target) {
return addTransition(condition, target, (e, ctx) -> {});
}
public State<T> addTransition(Predicate<T> condition, State<T> target, 
BiConsumer<T, Map<String, Object>> action) {
transitions.add(new Transition<>(condition, target, action));
return this;
}
public State<T> markFinal() {
this.isFinal = true;
return this;
}
public State<T> transition(T element, Map<String, Object> context) {
for (Transition<T> transition : transitions) {
if (transition.condition.test(element)) {
transition.action.accept(element, context);
return transition.target;
}
}
return null;
}
public boolean isFinal() { return isFinal; }
public String getName() { return name; }
}
private static class Transition<T> {
final Predicate<T> condition;
final State<T> target;
final BiConsumer<T, Map<String, Object>> action;
Transition(Predicate<T> condition, State<T> target, 
BiConsumer<T, Map<String, Object>> action) {
this.condition = condition;
this.target = target;
this.action = action;
}
}
}
// Example: Detect login failure patterns
public class SecurityPatternMatcher {
public static List<PatternMatch<SecurityEvent>> detectBruteForceAttempts(Stream<SecurityEvent> events) {
PatternStateMachine<SecurityEvent> fsm = new PatternStateMachine<>();
PatternStateMachine.State<SecurityEvent> start = fsm.startState;
PatternStateMachine.State<SecurityEvent> firstFailure = fsm.addState("firstFailure");
PatternStateMachine.State<SecurityEvent> secondFailure = fsm.addState("secondFailure");
PatternStateMachine.State<SecurityEvent> thirdFailure = fsm.addState("thirdFailure").markFinal();
// Transitions
start.addTransition(
e -> "LOGIN_FAILURE".equals(e.getType()) && e.getUser() != null,
firstFailure,
(e, ctx) -> ctx.put("user", e.getUser())
);
firstFailure.addTransition(
e -> "LOGIN_FAILURE".equals(e.getType()) && 
e.getUser().equals(ctx.get("user")) &&
isWithinTimeWindow(e, (SecurityEvent) ctx.get("lastEvent"), Duration.ofMinutes(5)),
secondFailure,
(e, ctx) -> ctx.put("lastEvent", e)
);
secondFailure.addTransition(
e -> "LOGIN_FAILURE".equals(e.getType()) && 
e.getUser().equals(ctx.get("user")) &&
isWithinTimeWindow(e, (SecurityEvent) ctx.get("lastEvent"), Duration.ofMinutes(5)),
thirdFailure,
(e, ctx) -> ctx.put("bruteForceDetected", true)
);
return fsm.match(events);
}
private static boolean isWithinTimeWindow(SecurityEvent current, SecurityEvent previous, Duration window) {
if (previous == null) return true;
return Duration.between(previous.getTimestamp(), current.getTimestamp()).compareTo(window) <= 0;
}
}
class SecurityEvent {
private final String type;
private final String user;
private final LocalDateTime timestamp;
private final String sourceIp;
public SecurityEvent(String type, String user, LocalDateTime timestamp, String sourceIp) {
this.type = type;
this.user = user;
this.timestamp = timestamp;
this.sourceIp = sourceIp;
}
// Getters
public String getType() { return type; }
public String getUser() { return user; }
public LocalDateTime getTimestamp() { return timestamp; }
public String getSourceIp() { return sourceIp; }
}

6. Performance-Optimized Pattern Matching

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class ParallelPatternMatcher {
// Parallel pattern matching for large datasets
public static <T> List<PatternMatch<T>> parallelMatch(Stream<T> stream, 
StreamPattern<T> pattern, 
int chunkSize) {
List<T> elements = stream.collect(Collectors.toList());
// Split into chunks for parallel processing
List<List<T>> chunks = splitIntoChunks(elements, chunkSize);
return chunks.parallelStream()
.flatMap(chunk -> {
List<PatternMatch<T>> chunkMatches = new ArrayList<>();
for (int i = 0; i < chunk.size(); i++) {
pattern.match(chunk, i).ifPresent(chunkMatches::add);
}
return chunkMatches.stream();
})
.collect(Collectors.toList());
}
private static <T> List<List<T>> splitIntoChunks(List<T> elements, int chunkSize) {
List<List<T>> chunks = new ArrayList<>();
for (int i = 0; i < elements.size(); i += chunkSize) {
int end = Math.min(i + chunkSize, elements.size());
chunks.add(elements.subList(i, end));
}
return chunks;
}
// Bloom filter for fast pattern pre-screening
public static class PatternBloomFilter<T> {
private final BitSet bitSet;
private final int size;
private final List<Function<T, Integer>> hashFunctions;
public PatternBloomFilter(int size, int numHashFunctions) {
this.bitSet = new BitSet(size);
this.size = size;
this.hashFunctions = new ArrayList<>();
Random random = new Random(42); // Fixed seed for consistency
for (int i = 0; i < numHashFunctions; i++) {
final int seed = random.nextInt();
hashFunctions.add(element -> Math.abs((element.hashCode() ^ seed) % size));
}
}
public void add(T element) {
for (Function<T, Integer> hashFunction : hashFunctions) {
int index = hashFunction.apply(element);
bitSet.set(index);
}
}
public boolean mightContain(T element) {
for (Function<T, Integer> hashFunction : hashFunctions) {
int index = hashFunction.apply(element);
if (!bitSet.get(index)) {
return false;
}
}
return true;
}
public <T> Stream<T> preFilter(Stream<T> stream, Predicate<T> patternPredicate) {
return stream.filter(element -> {
if (!mightContain(element)) {
return false; // Definitely not matching
}
// Might match, need full verification
return patternPredicate.test(element);
});
}
}
}
// Caching pattern matcher for repeated patterns
public class CachingPatternMatcher<T> {
private final Map<String, List<PatternMatch<T>>> cache;
private final AdvancedStreamPatternMatcher<T> matcher;
public CachingPatternMatcher() {
this.cache = new ConcurrentHashMap<>();
this.matcher = new AdvancedStreamPatternMatcher<>();
}
public List<PatternMatch<T>> matchCached(Stream<T> stream, String patternId, 
Supplier<StreamPattern<T>> patternSupplier) {
return cache.computeIfAbsent(patternId, id -> {
StreamPattern<T> pattern = patternSupplier.get();
matcher.addPattern(pattern);
List<T> elements = stream.collect(Collectors.toList());
return matcher.matchAll(elements.stream());
});
}
public void clearCache() {
cache.clear();
}
public void invalidatePattern(String patternId) {
cache.remove(patternId);
}
}

7. Testing Pattern Matching

import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.stream.*;
import static org.junit.jupiter.api.Assertions.*;
class StreamPatternMatchingTest {
@Test
void testSequencePatternMatching() {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 2, 3, 4, 1, 2, 3);
AdvancedStreamPatternMatcher<Integer> matcher = new AdvancedStreamPatternMatcher<>();
matcher.addPattern(new SequencePattern<>(
n -> n == 2,
n -> n == 3,
n -> n == 4
));
List<PatternMatch<Integer>> matches = matcher.matchAll(numbers.stream());
assertEquals(2, matches.size());
assertEquals(Arrays.asList(2, 3, 4), matches.get(0).getMatchedElements());
assertEquals(Arrays.asList(2, 3, 4), matches.get(1).getMatchedElements());
}
@Test
void testRepetitionPattern() {
List<String> words = Arrays.asList("a", "a", "a", "b", "c", "a", "a", "d");
RepetitionPattern<String> pattern = new RepetitionPattern<>(
word -> word.equals("a"), 2, 5
);
AdvancedStreamPatternMatcher<String> matcher = new AdvancedStreamPatternMatcher<>();
matcher.addPattern(pattern);
List<PatternMatch<String>> matches = matcher.matchAll(words.stream());
assertEquals(2, matches.size());
assertEquals(3, matches.get(0).getMatchedElements().size()); // "a", "a", "a"
assertEquals(2, matches.get(1).getMatchedElements().size()); // "a", "a"
}
@Test
void testFinancialPatterns() {
List<StockTick> ticks = Arrays.asList(
new StockTick("AAPL", 150.0, 1000, LocalDateTime.now()),
new StockTick("AAPL", 152.0, 1200, LocalDateTime.now().plusMinutes(1)),
new StockTick("AAPL", 151.0, 800, LocalDateTime.now().plusMinutes(2)),
new StockTick("AAPL", 153.0, 1500, LocalDateTime.now().plusMinutes(3))
);
List<PatternMatch<StockTick>> surges = FinancialPatternMatcher.findVolumeSurges(
ticks.stream(), 1.5
);
assertFalse(surges.isEmpty());
}
}

Best Practices for Stream Pattern Matching

1. Performance Considerations:

  • Use parallel streams for large datasets
  • Implement caching for repeated patterns
  • Consider bloom filters for pre-screening
  • Optimize predicate complexity

2. Memory Management:

  • Process streams sequentially when possible
  • Use primitive streams for numeric patterns
  • Implement custom collectors for complex patterns
  • Consider external memory patterns for very large datasets

3. Pattern Design:

  • Keep patterns focused and specific
  • Use composition to build complex patterns
  • Implement pattern validation
  • Document pattern semantics clearly

4. Error Handling:

public class SafePatternMatcher<T> {
public static <T> List<PatternMatch<T>> safeMatch(Stream<T> stream, 
StreamPattern<T> pattern) {
try {
AdvancedStreamPatternMatcher<T> matcher = new AdvancedStreamPatternMatcher<>();
matcher.addPattern(pattern);
return matcher.matchAll(stream);
} catch (Exception e) {
System.err.println("Pattern matching failed: " + e.getMessage());
return Collections.emptyList();
}
}
}

Conclusion

Pattern matching in Java streams enables sophisticated data analysis and processing capabilities. Key takeaways:

  1. Sequence Detection: Identify ordered patterns in data streams
  2. Condition Patterns: Find elements matching complex conditions
  3. Temporal Patterns: Detect time-based sequences and relationships
  4. Stateful Patterns: Implement complex state machines for pattern recognition
  5. Performance Optimization: Use parallel processing and caching for large datasets

Implementation Checklist:

  • ✅ Define clear pattern specifications
  • ✅ Implement appropriate pattern matching algorithms
  • ✅ Consider performance and memory requirements
  • ✅ Add comprehensive testing
  • ✅ Implement proper error handling
  • ✅ Consider parallel processing for large datasets
  • ✅ Add monitoring and metrics

Pattern matching in streams is particularly valuable for:

  • Financial analysis (trading patterns, anomalies)
  • Log analysis (error patterns, security threats)
  • IoT data processing (sensor patterns, predictive maintenance)
  • Network monitoring (intrusion detection, performance issues)
  • Business intelligence (customer behavior patterns, trends)

By mastering these pattern matching techniques, you can extract valuable insights from streaming data and build sophisticated real-time data processing applications.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper