Introduction to Log Anomaly Detection
Log anomaly detection involves identifying unusual patterns, outliers, or suspicious activities in application logs. This is crucial for security monitoring, performance optimization, and operational intelligence.
Types of Log Anomalies
- Volume Anomalies: Unexpected spikes or drops in log frequency
- Pattern Anomalies: Unusual log message formats or sequences
- Content Anomalies: Suspicious keywords or error patterns
- Temporal Anomalies: Logs occurring at unexpected times
- Statistical Anomalies: Deviations from normal statistical patterns
Core Implementation
Dependencies
Add to your pom.xml:
<properties>
<java.version>11</java.version>
<slf4j.version>2.0.9</slf4j.version>
<jackson.version>2.15.2</jackson.version>
<apache.commons.version>3.13.0</apache.commons.version>
</properties>
<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Math Utilities -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<!-- Collection Utilities -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<!-- For real-time processing -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
</dependencies>
Core Data Models
package com.anomalydetection.model;
import com.fasterxml.jackson.annotation.JsonFormat;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Objects;
public class LogEntry {
private String id;
private LogLevel level;
private String logger;
private String message;
private String thread;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS")
private LocalDateTime timestamp;
private Map<String, Object> context;
private String stackTrace;
private String source;
private double numericValue; // For metric logs
public enum LogLevel {
TRACE, DEBUG, INFO, WARN, ERROR, FATAL
}
// Constructors
public LogEntry() {}
public LogEntry(LogLevel level, String message, String logger, LocalDateTime timestamp) {
this.level = level;
this.message = message;
this.logger = logger;
this.timestamp = timestamp;
this.id = generateId();
}
private String generateId() {
return java.util.UUID.randomUUID().toString();
}
// Getters and setters
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public LogLevel getLevel() { return level; }
public void setLevel(LogLevel level) { this.level = level; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public String getLogger() { return logger; }
public void setLogger(String logger) { this.logger = logger; }
public LocalDateTime getTimestamp() { return timestamp; }
public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }
public Map<String, Object> getContext() { return context; }
public void setContext(Map<String, Object> context) { this.context = context; }
public String getStackTrace() { return stackTrace; }
public void setStackTrace(String stackTrace) { this.stackTrace = stackTrace; }
public String getSource() { return source; }
public void setSource(String source) { this.source = source; }
public double getNumericValue() { return numericValue; }
public void setNumericValue(double numericValue) { this.numericValue = numericValue; }
public String getThread() { return thread; }
public void setThread(String thread) { this.thread = thread; }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LogEntry logEntry = (LogEntry) o;
return Objects.equals(id, logEntry.id);
}
@Override
public int hashCode() {
return Objects.hash(id);
}
@Override
public String toString() {
return String.format("LogEntry{level=%s, logger='%s', message='%s', timestamp=%s}",
level, logger, message, timestamp);
}
}
package com.anomalydetection.model;
import java.time.LocalDateTime;
import java.util.Map;
public class Anomaly {
private String id;
private AnomalyType type;
private String description;
private double severity;
private LocalDateTime detectedAt;
private LogEntry relatedLogEntry;
private Map<String, Object> detectionContext;
public enum AnomalyType {
VOLUME_SPIKE, VOLUME_DROP, UNUSUAL_PATTERN,
SUSPICIOUS_CONTENT, ERROR_SPIKE, UNUSUAL_TIMING,
STATISTICAL_OUTLIER, SEQUENCE_VIOLATION
}
// Constructors, getters, and setters
public Anomaly() {
this.id = java.util.UUID.randomUUID().toString();
this.detectedAt = LocalDateTime.now();
}
public Anomaly(AnomalyType type, String description, double severity, LogEntry relatedLogEntry) {
this();
this.type = type;
this.description = description;
this.severity = severity;
this.relatedLogEntry = relatedLogEntry;
}
// Getters and setters
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public AnomalyType getType() { return type; }
public void setType(AnomalyType type) { this.type = type; }
public String getDescription() { return description; }
public void setDescription(String description) { this.description = description; }
public double getSeverity() { return severity; }
public void setSeverity(double severity) { this.severity = severity; }
public LocalDateTime getDetectedAt() { return detectedAt; }
public void setDetectedAt(LocalDateTime detectedAt) { this.detectedAt = detectedAt; }
public LogEntry getRelatedLogEntry() { return relatedLogEntry; }
public void setRelatedLogEntry(LogEntry relatedLogEntry) { this.relatedLogEntry = relatedLogEntry; }
public Map<String, Object> getDetectionContext() { return detectionContext; }
public void setDetectionContext(Map<String, Object> detectionContext) {
this.detectionContext = detectionContext;
}
@Override
public String toString() {
return String.format("Anomaly{type=%s, severity=%.2f, description='%s', detectedAt=%s}",
type, severity, description, detectedAt);
}
}
Statistical Anomaly Detector
package com.anomalydetection.detector;
import com.anomalydetection.model.Anomaly;
import com.anomalydetection.model.LogEntry;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
public class StatisticalAnomalyDetector {
private static final Logger logger = LoggerFactory.getLogger(StatisticalAnomalyDetector.class);
private final Map<String, DescriptiveStatistics> statisticsMap;
private final int windowSize;
private final double zScoreThreshold;
private final ReentrantLock lock;
public StatisticalAnomalyDetector(int windowSize, double zScoreThreshold) {
this.statisticsMap = new ConcurrentHashMap<>();
this.windowSize = windowSize;
this.zScoreThreshold = zScoreThreshold;
this.lock = new ReentrantLock();
}
public Optional<Anomaly> detectVolumeAnomaly(String logKey, long count, LocalDateTime timestamp) {
try {
lock.lock();
DescriptiveStatistics stats = statisticsMap.computeIfAbsent(
logKey, k -> new DescriptiveStatistics(windowSize)
);
// Add new value
stats.addValue(count);
// Need enough data points for meaningful statistics
if (stats.getN() < Math.max(10, windowSize / 2)) {
return Optional.empty();
}
double mean = stats.getMean();
double stdDev = stats.getStandardDeviation();
// Avoid division by zero
if (stdDev == 0) {
stdDev = 1.0;
}
double zScore = Math.abs((count - mean) / stdDev);
if (zScore > zScoreThreshold) {
String description = String.format(
"Volume anomaly detected for %s: count=%d, mean=%.2f, z-score=%.2f",
logKey, count, mean, zScore
);
double severity = calculateSeverity(zScore, zScoreThreshold);
LogEntry syntheticLog = createSyntheticLogEntry(
"Volume anomaly: " + description, timestamp
);
Anomaly anomaly = new Anomaly(
Anomaly.AnomalyType.VOLUME_SPIKE,
description,
severity,
syntheticLog
);
anomaly.setDetectionContext(Map.of(
"zScore", zScore,
"mean", mean,
"stdDev", stdDev,
"count", count,
"logKey", logKey
));
logger.warn("Statistical anomaly detected: {}", description);
return Optional.of(anomaly);
}
} finally {
lock.unlock();
}
return Optional.empty();
}
public Optional<Anomaly> detectValueAnomaly(LogEntry logEntry, double value) {
String key = logEntry.getLogger() + ":" + logEntry.getLevel();
try {
lock.lock();
DescriptiveStatistics stats = statisticsMap.computeIfAbsent(
key, k -> new DescriptiveStatistics(windowSize)
);
stats.addValue(value);
if (stats.getN() < Math.max(10, windowSize / 2)) {
return Optional.empty();
}
double mean = stats.getMean();
double stdDev = stats.getStandardDeviation();
if (stdDev == 0) stdDev = 1.0;
double zScore = Math.abs((value - mean) / stdDev);
if (zScore > zScoreThreshold) {
String description = String.format(
"Value anomaly in %s: value=%.2f, mean=%.2f, z-score=%.2f",
key, value, mean, zScore
);
double severity = calculateSeverity(zScore, zScoreThreshold);
Anomaly anomaly = new Anomaly(
Anomaly.AnomalyType.STATISTICAL_OUTLIER,
description,
severity,
logEntry
);
anomaly.setDetectionContext(Map.of(
"zScore", zScore,
"mean", mean,
"stdDev", stdDev,
"value", value,
"metricKey", key
));
return Optional.of(anomaly);
}
} finally {
lock.unlock();
}
return Optional.empty();
}
private double calculateSeverity(double zScore, double threshold) {
// Normalize severity between 0 and 1
double normalized = (zScore - threshold) / threshold;
return Math.min(Math.max(normalized, 0.0), 1.0);
}
private LogEntry createSyntheticLogEntry(String message, LocalDateTime timestamp) {
LogEntry log = new LogEntry();
log.setId(UUID.randomUUID().toString());
log.setLevel(LogEntry.LogLevel.WARN);
log.setMessage(message);
log.setLogger("AnomalyDetector");
log.setTimestamp(timestamp);
return log;
}
public Map<String, DescriptiveStatistics> getStatistics() {
return new HashMap<>(statisticsMap);
}
public void clearStatistics(String key) {
statisticsMap.remove(key);
}
public void clearAllStatistics() {
statisticsMap.clear();
}
}
Pattern-Based Anomaly Detector
package com.anomalydetection.detector;
import com.anomalydetection.model.Anomaly;
import com.anomalydetection.model.LogEntry;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
public class PatternAnomalyDetector {
private final Map<String, LogPattern> knownPatterns;
private final Map<String, Integer> patternCounts;
private final Set<String> suspiciousKeywords;
private final double unusualPatternThreshold;
public PatternAnomalyDetector(double unusualPatternThreshold) {
this.knownPatterns = new ConcurrentHashMap<>();
this.patternCounts = new ConcurrentHashMap<>();
this.suspiciousKeywords = ConcurrentHashMap.newKeySet();
this.unusualPatternThreshold = unusualPatternThreshold;
initializeSuspiciousKeywords();
initializeCommonPatterns();
}
private void initializeSuspiciousKeywords() {
// Security-related suspicious patterns
suspiciousKeywords.addAll(Arrays.asList(
"sql injection", "xss", "csrf", "unauthorized", "access denied",
"authentication failed", "brute force", "malware", "virus detected",
"privilege escalation", "data breach", "credential theft"
));
}
private void initializeCommonPatterns() {
// Common log patterns
addKnownPattern("SQL_ERROR",
Pattern.compile("(?i).*(sql(error|exception)|database.*error).*"),
"Database errors");
addKnownPattern("AUTH_FAILURE",
Pattern.compile("(?i).*(authentication.*failed|login.*failed|invalid.*credential).*"),
"Authentication failures");
addKnownPattern("MEMORY_ISSUE",
Pattern.compile("(?i).*(outofmemory|memory.*exhausted|heap.*space).*"),
"Memory-related issues");
addKnownPattern("NETWORK_ERROR",
Pattern.compile("(?i).*(connection.*(timeout|refused)|network.*error).*"),
"Network connectivity issues");
}
public void addKnownPattern(String patternId, Pattern regex, String description) {
knownPatterns.put(patternId, new LogPattern(patternId, regex, description));
}
public void addSuspiciousKeyword(String keyword) {
suspiciousKeywords.add(keyword.toLowerCase());
}
public List<Anomaly> detectPatternAnomalies(LogEntry logEntry) {
List<Anomaly> anomalies = new ArrayList<>();
// Check for suspicious content
anomalies.addAll(detectSuspiciousContent(logEntry));
// Check for unusual patterns
anomalies.addAll(detectUnusualPatterns(logEntry));
// Check for pattern violations
anomalies.addAll(detectPatternViolations(logEntry));
return anomalies;
}
private List<Anomaly> detectSuspiciousContent(LogEntry logEntry) {
List<Anomaly> anomalies = new ArrayList<>();
String message = logEntry.getMessage().toLowerCase();
for (String keyword : suspiciousKeywords) {
if (message.contains(keyword)) {
String description = String.format(
"Suspicious keyword detected: '%s' in log message", keyword
);
Anomaly anomaly = new Anomaly(
Anomaly.AnomalyType.SUSPICIOUS_CONTENT,
description,
0.8, // High severity for security issues
logEntry
);
anomaly.setDetectionContext(Map.of(
"keyword", keyword,
"message", logEntry.getMessage()
));
anomalies.add(anomaly);
}
}
return anomalies;
}
private List<Anomaly> detectUnusualPatterns(LogEntry logEntry) {
List<Anomaly> anomalies = new ArrayList<>();
String message = logEntry.getMessage();
// Extract pattern signature (simplified)
String patternSignature = extractPatternSignature(message);
// Update pattern frequency
int count = patternCounts.getOrDefault(patternSignature, 0) + 1;
patternCounts.put(patternSignature, count);
// Check if this is an unusual pattern (low frequency)
double totalPatterns = patternCounts.values().stream().mapToInt(Integer::intValue).sum();
double frequency = count / totalPatterns;
if (frequency < unusualPatternThreshold && totalPatterns > 100) {
String description = String.format(
"Unusual log pattern detected: '%s' (frequency: %.4f)",
patternSignature, frequency
);
double severity = calculatePatternSeverity(frequency, unusualPatternThreshold);
Anomaly anomaly = new Anomaly(
Anomaly.AnomalyType.UNUSUAL_PATTERN,
description,
severity,
logEntry
);
anomaly.setDetectionContext(Map.of(
"pattern", patternSignature,
"frequency", frequency,
"threshold", unusualPatternThreshold
));
anomalies.add(anomaly);
}
return anomalies;
}
private List<Anomaly> detectPatternViolations(LogEntry logEntry) {
List<Anomaly> anomalies = new ArrayList<>();
// Check against known patterns
for (LogPattern pattern : knownPatterns.values()) {
Matcher matcher = pattern.getRegex().matcher(logEntry.getMessage());
if (matcher.matches()) {
// This is a known pattern - could trigger alert based on frequency
String description = String.format(
"Known issue pattern detected: %s - %s",
pattern.getId(), pattern.getDescription()
);
Anomaly anomaly = new Anomaly(
Anomaly.AnomalyType.UNUSUAL_PATTERN,
description,
0.6, // Medium severity for known issues
logEntry
);
anomaly.setDetectionContext(Map.of(
"patternId", pattern.getId(),
"description", pattern.getDescription()
));
anomalies.add(anomaly);
}
}
return anomalies;
}
private String extractPatternSignature(String message) {
// Simple pattern extraction: replace numbers and specific values with placeholders
String signature = message
.replaceAll("\\d+", "#NUM")
.replaceAll("0x[0-9a-fA-F]+", "#HEX")
.replaceAll("[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}", "#UUID")
.replaceAll("\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b", "#EMAIL")
.replaceAll("\\b(?:[0-9]{1,3}\\.){3}[0-9]{1,3}\\b", "#IP");
return signature;
}
private double calculatePatternSeverity(double frequency, double threshold) {
// Lower frequency = higher severity
return Math.max(0, 1 - (frequency / threshold));
}
private static class LogPattern {
private final String id;
private final Pattern regex;
private final String description;
public LogPattern(String id, Pattern regex, String description) {
this.id = id;
this.regex = regex;
this.description = description;
}
public String getId() { return id; }
public Pattern getRegex() { return regex; }
public String getDescription() { return description; }
}
public Map<String, Integer> getPatternStatistics() {
return new HashMap<>(patternCounts);
}
}
Sequence Anomaly Detector
package com.anomalydetection.detector;
import com.anomalydetection.model.Anomaly;
import com.anomalydetection.model.LogEntry;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
public class SequenceAnomalyDetector {
private final Map<String, Deque<LogEntry>> sequenceBuffers;
private final Map<String, List<String>> normalSequences;
private final int sequenceLength;
private final ReentrantLock lock;
public SequenceAnomalyDetector(int sequenceLength) {
this.sequenceBuffers = new ConcurrentHashMap<>();
this.normalSequences = new ConcurrentHashMap<>();
this.sequenceLength = sequenceLength;
this.lock = new ReentrantLock();
}
public Optional<Anomaly> detectSequenceAnomaly(LogEntry logEntry) {
String sourceKey = logEntry.getSource() != null ? logEntry.getSource() : "default";
try {
lock.lock();
Deque<LogEntry> buffer = sequenceBuffers.computeIfAbsent(
sourceKey, k -> new ArrayDeque<>(sequenceLength)
);
// Add new entry
buffer.addLast(logEntry);
// Maintain buffer size
if (buffer.size() > sequenceLength) {
buffer.removeFirst();
}
// Check for sequence anomalies if we have enough data
if (buffer.size() == sequenceLength) {
List<String> currentSequence = extractSequenceSignature(buffer);
// Check if this sequence is normal
if (!isNormalSequence(sourceKey, currentSequence)) {
String description = String.format(
"Unusual log sequence detected for source: %s. Sequence: %s",
sourceKey, String.join(" -> ", currentSequence)
);
Anomaly anomaly = new Anomaly(
Anomaly.AnomalyType.SEQUENCE_VIOLATION,
description,
0.7,
logEntry
);
anomaly.setDetectionContext(Map.of(
"source", sourceKey,
"sequence", currentSequence,
"sequenceLength", sequenceLength
));
return Optional.of(anomaly);
}
// Learn this sequence as normal for future reference
learnNormalSequence(sourceKey, currentSequence);
}
} finally {
lock.unlock();
}
return Optional.empty();
}
private List<String> extractSequenceSignature(Deque<LogEntry> buffer) {
List<String> sequence = new ArrayList<>();
for (LogEntry entry : buffer) {
String signature = entry.getLevel() + ":" +
extractMessageCategory(entry.getMessage());
sequence.add(signature);
}
return sequence;
}
private String extractMessageCategory(String message) {
// Categorize message based on content
if (message.toLowerCase().contains("error") || message.contains("Exception")) {
return "ERROR";
} else if (message.toLowerCase().contains("warn")) {
return "WARN";
} else if (message.toLowerCase().contains("database") || message.contains("SQL")) {
return "DB";
} else if (message.toLowerCase().contains("network") || message.contains("http")) {
return "NETWORK";
} else if (message.toLowerCase().contains("auth") || message.contains("login")) {
return "AUTH";
} else {
return "INFO";
}
}
private boolean isNormalSequence(String sourceKey, List<String> sequence) {
List<String> knownSequences = normalSequences.get(sourceKey);
if (knownSequences == null || knownSequences.isEmpty()) {
return false; // No learned sequences yet
}
String sequenceStr = String.join("|", sequence);
// Check if this sequence matches any known normal sequence
for (String knownSequence : knownSequences) {
if (sequenceSimilarity(sequenceStr, knownSequence) > 0.8) {
return true;
}
}
return false;
}
private void learnNormalSequence(String sourceKey, List<String> sequence) {
List<String> sequences = normalSequences.computeIfAbsent(
sourceKey, k -> new ArrayList<>()
);
String sequenceStr = String.join("|", sequence);
// Avoid duplicates
if (!sequences.contains(sequenceStr)) {
sequences.add(sequenceStr);
// Limit the number of stored sequences
if (sequences.size() > 100) {
sequences.remove(0);
}
}
}
private double sequenceSimilarity(String seq1, String seq2) {
// Simple sequence similarity based on common elements
String[] parts1 = seq1.split("\\|");
String[] parts2 = seq2.split("\\|");
int matches = 0;
int minLength = Math.min(parts1.length, parts2.length);
for (int i = 0; i < minLength; i++) {
if (parts1[i].equals(parts2[i])) {
matches++;
}
}
return (double) matches / Math.max(parts1.length, parts2.length);
}
public void addNormalSequence(String sourceKey, List<String> sequence) {
learnNormalSequence(sourceKey, sequence);
}
public Map<String, List<String>> getLearnedSequences() {
return new HashMap<>(normalSequences);
}
}
Real-Time Log Processor
package com.anomalydetection.processor;
import com.anomalydetection.detector.*;
import com.anomalydetection.model.Anomaly;
import com.anomalydetection.model.LogEntry;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class RealTimeLogProcessor {
private static final Logger logger = LoggerFactory.getLogger(RealTimeLogProcessor.class);
private final Disruptor<LogEvent> disruptor;
private final RingBuffer<LogEvent> ringBuffer;
private final ExecutorService executor;
private final StatisticalAnomalyDetector statisticalDetector;
private final PatternAnomalyDetector patternDetector;
private final SequenceAnomalyDetector sequenceDetector;
private final List<AnomalyListener> anomalyListeners;
private final AtomicLong processedCount;
private final AtomicLong anomalyCount;
private final Map<String, AtomicLong> logCounters;
private final ScheduledExecutorService metricsScheduler;
public RealTimeLogProcessor(int bufferSize) {
this.executor = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "LogProcessor-Disruptor");
t.setDaemon(true);
return t;
});
// Initialize detectors
this.statisticalDetector = new StatisticalAnomalyDetector(1000, 3.0); // 3 sigma
this.patternDetector = new PatternAnomalyDetector(0.001); // 0.1% frequency threshold
this.sequenceDetector = new SequenceAnomalyDetector(5); // 5-entry sequences
this.anomalyListeners = new CopyOnWriteArrayList<>();
this.processedCount = new AtomicLong(0);
this.anomalyCount = new AtomicLong(0);
this.logCounters = new ConcurrentHashMap<>();
// Setup disruptor for high-performance processing
this.disruptor = new Disruptor<>(
LogEvent::new,
bufferSize,
executor,
ProducerType.MULTI,
new SleepingWaitStrategy()
);
this.ringBuffer = disruptor.getRingBuffer();
// Setup event handlers
setupEventHandlers();
// Start metrics collection
this.metricsScheduler = Executors.newSingleThreadScheduledExecutor();
startMetricsCollection();
}
private void setupEventHandlers() {
// Create event handlers
LogEventHandler[] handlers = {
new LogEventHandler("PatternHandler") {
@Override
protected void processEvent(LogEntry logEntry, long sequence) {
processPatternDetection(logEntry);
}
},
new LogEventHandler("SequenceHandler") {
@Override
protected void processEvent(LogEntry logEntry, long sequence) {
processSequenceDetection(logEntry);
}
},
new LogEventHandler("StatisticalHandler") {
@Override
protected void processEvent(LogEntry logEntry, long sequence) {
updateCounters(logEntry);
}
}
};
// Setup disruptor with handlers
disruptor.handleEventsWith(handlers);
}
public void start() {
logger.info("Starting RealTimeLogProcessor");
disruptor.start();
}
public void stop() {
logger.info("Stopping RealTimeLogProcessor");
disruptor.shutdown();
executor.shutdown();
metricsScheduler.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
if (!metricsScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
metricsScheduler.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
metricsScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
public void processLog(LogEntry logEntry) {
long sequence = ringBuffer.next();
try {
LogEvent event = ringBuffer.get(sequence);
event.setLogEntry(logEntry);
} finally {
ringBuffer.publish(sequence);
}
processedCount.incrementAndGet();
}
private void processPatternDetection(LogEntry logEntry) {
try {
List<Anomaly> anomalies = patternDetector.detectPatternAnomalies(logEntry);
for (Anomaly anomaly : anomalies) {
notifyAnomalyListeners(anomaly);
}
} catch (Exception e) {
logger.error("Error in pattern detection for log: {}", logEntry.getId(), e);
}
}
private void processSequenceDetection(LogEntry logEntry) {
try {
Optional<Anomaly> anomaly = sequenceDetector.detectSequenceAnomaly(logEntry);
anomaly.ifPresent(this::notifyAnomalyListeners);
} catch (Exception e) {
logger.error("Error in sequence detection for log: {}", logEntry.getId(), e);
}
}
private void updateCounters(LogEntry logEntry) {
// Update counters for statistical detection
String minuteKey = String.format("%s:%s:%d",
logEntry.getSource(),
logEntry.getLevel(),
logEntry.getTimestamp().getMinute()
);
AtomicLong counter = logCounters.computeIfAbsent(minuteKey, k -> new AtomicLong(0));
long count = counter.incrementAndGet();
// Check for volume anomalies every 10 increments
if (count % 10 == 0) {
Optional<Anomaly> anomaly = statisticalDetector.detectVolumeAnomaly(
minuteKey, count, logEntry.getTimestamp()
);
anomaly.ifPresent(this::notifyAnomalyListeners);
}
// Check for value anomalies if log has numeric value
if (logEntry.getNumericValue() != 0) {
Optional<Anomaly> anomaly = statisticalDetector.detectValueAnomaly(
logEntry, logEntry.getNumericValue()
);
anomaly.ifPresent(this::notifyAnomalyListeners);
}
}
private void startMetricsCollection() {
metricsScheduler.scheduleAtFixedRate(() -> {
try {
// Check for volume anomalies across all counters
for (Map.Entry<String, AtomicLong> entry : logCounters.entrySet()) {
String key = entry.getKey();
long count = entry.getValue().get();
Optional<Anomaly> anomaly = statisticalDetector.detectVolumeAnomaly(
key, count, java.time.LocalDateTime.now()
);
anomaly.ifPresent(this::notifyAnomalyListeners);
// Reset counter for next period
entry.getValue().set(0);
}
// Log processing metrics
if (processedCount.get() % 1000 == 0) {
logger.info("Processing metrics - Total: {}, Anomalies: {}",
processedCount.get(), anomalyCount.get());
}
} catch (Exception e) {
logger.error("Error in metrics collection", e);
}
}, 60, 60, TimeUnit.SECONDS); // Run every minute
}
public void addAnomalyListener(AnomalyListener listener) {
anomalyListeners.add(listener);
}
public void removeAnomalyListener(AnomalyListener listener) {
anomalyListeners.remove(listener);
}
private void notifyAnomalyListeners(Anomaly anomaly) {
anomalyCount.incrementAndGet();
for (AnomalyListener listener : anomalyListeners) {
try {
listener.onAnomalyDetected(anomaly);
} catch (Exception e) {
logger.error("Error notifying anomaly listener", e);
}
}
}
// Disruptor event classes
private static class LogEvent {
private LogEntry logEntry;
public void setLogEntry(LogEntry logEntry) {
this.logEntry = logEntry;
}
public LogEntry getLogEntry() {
return logEntry;
}
}
private abstract static class LogEventHandler implements EventHandler<LogEvent> {
private final String name;
public LogEventHandler(String name) {
this.name = name;
}
@Override
public void onEvent(LogEvent event, long sequence, boolean endOfBatch) {
if (event.getLogEntry() != null) {
processEvent(event.getLogEntry(), sequence);
}
}
protected abstract void processEvent(LogEntry logEntry, long sequence);
}
public interface AnomalyListener {
void onAnomalyDetected(Anomaly anomaly);
}
// Statistics getters
public long getProcessedCount() { return processedCount.get(); }
public long getAnomalyCount() { return anomalyCount.get(); }
public double getAnomalyRate() {
return processedCount.get() > 0 ?
(double) anomalyCount.get() / processedCount.get() : 0.0;
}
}
Alert Manager and Notifications
package com.anomalydetection.alert;
import com.anomalydetection.model.Anomaly;
import com.anomalydetection.processor.RealTimeLogProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class AlertManager implements RealTimeLogProcessor.AnomalyListener {
private static final Logger logger = LoggerFactory.getLogger(AlertManager.class);
private final Map<Anomaly.AnomalyType, AlertRule> alertRules;
private final List<AlertNotifier> notifiers;
private final ScheduledExecutorService scheduler;
private final Map<String, Long> lastAlertTime;
private final long alertCooldownMs;
public AlertManager(long alertCooldownMs) {
this.alertRules = new ConcurrentHashMap<>();
this.notifiers = new ArrayList<>();
this.scheduler = Executors.newSingleThreadScheduledExecutor();
this.lastAlertTime = new ConcurrentHashMap<>();
this.alertCooldownMs = alertCooldownMs;
setupDefaultRules();
startAlertCleanup();
}
private void setupDefaultRules() {
// High severity anomalies
addAlertRule(Anomaly.AnomalyType.SUSPICIOUS_CONTENT,
anomaly -> anomaly.getSeverity() > 0.7);
// Volume spikes
addAlertRule(Anomaly.AnomalyType.VOLUME_SPIKE,
anomaly -> anomaly.getSeverity() > 0.8);
// Sequence violations
addAlertRule(Anomaly.AnomalyType.SEQUENCE_VIOLATION,
anomaly -> anomaly.getSeverity() > 0.6);
}
public void addAlertRule(Anomaly.AnomalyType type, AlertRule rule) {
alertRules.put(type, rule);
}
public void addNotifier(AlertNotifier notifier) {
notifiers.add(notifier);
}
@Override
public void onAnomalyDetected(Anomaly anomaly) {
AlertRule rule = alertRules.get(anomaly.getType());
if (rule != null && rule.shouldAlert(anomaly)) {
triggerAlert(anomaly);
}
}
private void triggerAlert(Anomaly anomaly) {
String alertKey = generateAlertKey(anomaly);
long currentTime = System.currentTimeMillis();
long lastTime = lastAlertTime.getOrDefault(alertKey, 0L);
// Check cooldown period
if (currentTime - lastTime < alertCooldownMs) {
logger.debug("Alert suppressed due to cooldown: {}", alertKey);
return;
}
lastAlertTime.put(alertKey, currentTime);
// Create alert
Alert alert = new Alert(
"LOG_ANOMALY_" + anomaly.getType(),
String.format("Log Anomaly Detected: %s", anomaly.getDescription()),
anomaly.getSeverity(),
anomaly,
System.currentTimeMillis()
);
// Notify all notifiers
for (AlertNotifier notifier : notifiers) {
try {
notifier.notify(alert);
} catch (Exception e) {
logger.error("Error sending alert via notifier: {}", notifier.getClass().getSimpleName(), e);
}
}
logger.warn("Alert triggered: {}", alert.getDescription());
}
private String generateAlertKey(Anomaly anomaly) {
return String.format("%s:%s",
anomaly.getType(),
anomaly.getRelatedLogEntry().getSource()
);
}
private void startAlertCleanup() {
scheduler.scheduleAtFixedRate(() -> {
long currentTime = System.currentTimeMillis();
long cutoffTime = currentTime - (alertCooldownMs * 2);
lastAlertTime.entrySet().removeIf(entry -> entry.getValue() < cutoffTime);
}, 1, 1, TimeUnit.HOURS);
}
public void shutdown() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
public interface AlertRule {
boolean shouldAlert(Anomaly anomaly);
}
public interface AlertNotifier {
void notify(Alert alert);
}
public static class Alert {
private final String id;
private final String type;
private final String description;
private final double severity;
private final Anomaly cause;
private final long timestamp;
public Alert(String type, String description, double severity, Anomaly cause, long timestamp) {
this.id = UUID.randomUUID().toString();
this.type = type;
this.description = description;
this.severity = severity;
this.cause = cause;
this.timestamp = timestamp;
}
// Getters
public String getId() { return id; }
public String getType() { return type; }
public String getDescription() { return description; }
public double getSeverity() { return severity; }
public Anomaly getCause() { return cause; }
public long getTimestamp() { return timestamp; }
@Override
public String toString() {
return String.format("Alert{type=%s, severity=%.2f, description='%s'}",
type, severity, description);
}
}
}
// Example notifier implementations
package com.anomalydetection.alert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LoggingAlertNotifier implements AlertManager.AlertNotifier {
private static final Logger logger = LoggerFactory.getLogger(LoggingAlertNotifier.class);
@Override
public void notify(AlertManager.Alert alert) {
logger.error("SECURITY ALERT - {}: {}", alert.getType(), alert.getDescription());
}
}
public class ConsoleAlertNotifier implements AlertManager.AlertNotifier {
@Override
public void notify(AlertManager.Alert alert) {
System.err.printf("[ALERT] %s - %s (Severity: %.2f)%n",
alert.getType(), alert.getDescription(), alert.getSeverity());
}
}
Spring Boot Integration
package com.anomalydetection.config;
import com.anomalydetection.alert.AlertManager;
import com.anomalydetection.alert.ConsoleAlertNotifier;
import com.anomalydetection.alert.LoggingAlertNotifier;
import com.anomalydetection.processor.RealTimeLogProcessor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PreDestroy;
@Configuration
@ConfigurationProperties(prefix = "anomaly.detection")
public class AnomalyDetectionConfig {
private int bufferSize = 1024;
private long alertCooldownMs = 300000; // 5 minutes
@Bean
public RealTimeLogProcessor logProcessor() {
RealTimeLogProcessor processor = new RealTimeLogProcessor(bufferSize);
processor.start();
return processor;
}
@Bean
public AlertManager alertManager() {
AlertManager alertManager = new AlertManager(alertCooldownMs);
// Register notifiers
alertManager.addNotifier(new LoggingAlertNotifier());
alertManager.addNotifier(new ConsoleAlertNotifier());
// Register with log processor
logProcessor().addAnomalyListener(alertManager);
return alertManager;
}
@PreDestroy
public void cleanup() {
logProcessor().stop();
alertManager().shutdown();
}
// Getters and setters for configuration properties
public int getBufferSize() { return bufferSize; }
public void setBufferSize(int bufferSize) { this.bufferSize = bufferSize; }
public long getAlertCooldownMs() { return alertCooldownMs; }
public void setAlertCooldownMs(long alertCooldownMs) { this.alertCooldownMs = alertCooldownMs; }
}
Usage Example
package com.anomalydetection.demo;
import com.anomalydetection.model.LogEntry;
import com.anomalydetection.processor.RealTimeLogProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
import java.util.Map;
@RestController
@RequestMapping("/api/logs")
public class LogIngestionController {
@Autowired
private RealTimeLogProcessor logProcessor;
@PostMapping
public Map<String, String> ingestLog(@RequestBody Map<String, Object> logData) {
LogEntry logEntry = new LogEntry();
logEntry.setLevel(LogEntry.LogLevel.valueOf(
logData.getOrDefault("level", "INFO").toString().toUpperCase()));
logEntry.setMessage(logData.get("message").toString());
logEntry.setLogger(logData.getOrDefault("logger", "default").toString());
logEntry.setTimestamp(LocalDateTime.now());
logEntry.setSource(logData.getOrDefault("source", "unknown").toString());
if (logData.containsKey("context")) {
logEntry.setContext((Map<String, Object>) logData.get("context"));
}
// Process the log entry
logProcessor.processLog(logEntry);
return Map.of("status", "processed", "logId", logEntry.getId());
}
@GetMapping("/stats")
public Map<String, Object> getStats() {
return Map.of(
"processed", logProcessor.getProcessedCount(),
"anomalies", logProcessor.getAnomalyCount(),
"anomalyRate", logProcessor.getAnomalyRate()
);
}
}
// Example simulation
package com.anomalydetection.demo;
import com.anomalydetection.model.LogEntry;
import com.anomalydetection.processor.RealTimeLogProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Random;
@Component
public class LogSimulator {
@Autowired
private RealTimeLogProcessor logProcessor;
private final Random random = new Random();
private final String[] logLevels = {"INFO", "WARN", "ERROR"};
private final String[] services = {"auth-service", "order-service", "payment-service", "user-service"};
@Scheduled(fixedRate = 100) // Generate logs every 100ms
public void generateLogs() {
// Simulate normal traffic with occasional anomalies
if (random.nextDouble() < 0.95) {
generateNormalLog();
} else {
generateAnomalousLog();
}
}
private void generateNormalLog() {
LogEntry log = new LogEntry();
log.setLevel(LogEntry.LogLevel.valueOf(logLevels[random.nextInt(2)])); // INFO or WARN
log.setLogger(services[random.nextInt(services.length)]);
log.setMessage("Normal operation completed successfully");
log.setTimestamp(LocalDateTime.now());
log.setSource(log.getLogger());
logProcessor.processLog(log);
}
private void generateAnomalousLog() {
LogEntry log = new LogEntry();
// Randomly choose anomaly type
int anomalyType = random.nextInt(4);
switch (anomalyType) {
case 0:
// Error spike
log.setLevel(LogEntry.LogLevel.ERROR);
log.setMessage("Database connection failed: Timeout after 30 seconds");
break;
case 1:
// Suspicious content
log.setLevel(LogEntry.LogLevel.WARN);
log.setMessage("Possible SQL injection attempt detected in user input");
break;
case 2:
// Unusual pattern
log.setLevel(LogEntry.LogLevel.INFO);
log.setMessage("Unusual memory allocation pattern detected");
log.setNumericValue(1000 + random.nextDouble() * 5000); // High value
break;
case 3:
// Volume anomaly (simulated by multiple rapid logs)
for (int i = 0; i < 50; i++) {
LogEntry volumeLog = new LogEntry();
volumeLog.setLevel(LogEntry.LogLevel.INFO);
volumeLog.setLogger("volume-service");
volumeLog.setMessage("High volume request processed");
volumeLog.setTimestamp(LocalDateTime.now());
volumeLog.setSource("volume-service");
logProcessor.processLog(volumeLog);
}
return;
}
log.setLogger(services[random.nextInt(services.length)]);
log.setTimestamp(LocalDateTime.now());
log.setSource(log.getLogger());
logProcessor.processLog(log);
}
}
Conclusion
This comprehensive log anomaly detection system provides:
- Real-time processing using LMAX Disruptor for high throughput
- Multiple detection algorithms: statistical, pattern-based, and sequence-based
- Configurable alerting with cooldown mechanisms to prevent alert fatigue
- Spring Boot integration for easy deployment
- Extensible architecture allowing custom detectors and notifiers
Key features include statistical outlier detection using Z-scores, pattern recognition for security threats, sequence analysis for behavioral anomalies, and a robust alerting system. The system is designed for production use with proper error handling, performance monitoring, and scalability considerations.
This implementation can process thousands of log entries per second while maintaining low latency, making it suitable for enterprise-level log monitoring and security operations.