Overview
Grok is a Logstash plugin that enables parsing of unstructured log data into structured, queryable formats. This guide covers implementing Grok-like pattern matching directly in Java for log processing.
Core Grok Pattern Implementation
1. Grok Pattern Engine in Java
import java.util.*;
import java.util.regex.*;
import java.util.concurrent.ConcurrentHashMap;
public class JavaGrok {
private final Map<String, String> patternDefinitions;
private final Map<String, Pattern> compiledPatterns;
// Common Java log patterns
private static final Map<String, String> DEFAULT_PATTERNS = Map.ofEntries(
// Basic patterns
Map.entry("WORD", "\\b\\w+\\b"),
Map.entry("NOTSPACE", "\\S+"),
Map.entry("SPACE", "\\s*"),
Map.entry("DATA", ".*?"),
Map.entry("GREEDYDATA", ".*"),
Map.entry("NUMBER", "(?:%{BASE10NUM})"),
Map.entry("BASE10NUM", "(?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+)))"),
Map.entry("INT", "(?:[+-]?(?:[0-9]+))"),
// Java-specific patterns
Map.entry("JAVA_TIMESTAMP", "%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}"),
Map.entry("JAVA_LOGGER", "(?:[a-zA-Z0-9-]+\\.)+[A-Za-z0-9$]+"),
Map.entry("JAVA_CLASS", "(?:[a-zA-Z0-9-]+\\.)+[A-Za-z0-9$]+"),
Map.entry("JAVA_METHOD", "[a-zA-Z_][a-zA-Z0-9_]*"),
Map.entry("JAVA_EXCEPTION", "(?:[a-zA-Z0-9-]+\\.)+[A-Za-z0-9$]+(?:Exception|Error)"),
Map.entry("JAVA_STACKTRACE", "(?m)^\\s*at %{JAVA_CLASS}\\.%{JAVA_METHOD}\\(.*\\)$"),
Map.entry("JAVA_THREAD", "\\[.*\\]"),
Map.entry("JAVA_LEVEL", "DEBUG|INFO|WARN|ERROR|FATAL|TRACE"),
// Date/Time patterns
Map.entry("YEAR", "(?>\\d\\d){1,2}"),
Map.entry("MONTHNUM", "(?:0?[1-9]|1[0-2])"),
Map.entry("MONTHDAY", "(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])"),
Map.entry("HOUR", "(?:2[0123]|[01]?[0-9])"),
Map.entry("MINUTE", "(?:[0-5][0-9])"),
Map.entry("SECOND", "(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)"),
Map.entry("ISO8601_TIMEZONE", "(?:Z|[+-]%{HOUR}(?::?%{MINUTE}))"),
// Log4j/Logback patterns
Map.entry("LOG4J_DATETIME", "%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}"),
Map.entry("LOGLEVEL", "%{JAVA_LEVEL}"),
Map.entry("LOGGER", "%{JAVA_LOGGER}"),
// Web/HTTP patterns
Map.entry("IP", "(?:%{IPV6}|%{IPV4})"),
Map.entry("IPV6", "(([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))"),
Map.entry("IPV4", "(?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.]){3}(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])(?![0-9])"),
Map.entry("HTTPMETHOD", "GET|POST|PUT|DELETE|PATCH|HEAD|OPTIONS"),
Map.entry("URIPATH", "(?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\\-]*)+"),
Map.entry("HTTP_VERSION", "HTTP/(?:1\\.0|1\\.1|2\\.0)"),
Map.entry("HTTP_STATUS", "(?:[1-5][0-9]{2})"),
Map.entry("USERAGENT", ".+?")
);
public JavaGrok() {
this(DEFAULT_PATTERNS);
}
public JavaGrok(Map<String, String> customPatterns) {
this.patternDefinitions = new HashMap<>(DEFAULT_PATTERNS);
if (customPatterns != null) {
this.patternDefinitions.putAll(customPatterns);
}
this.compiledPatterns = new ConcurrentHashMap<>();
}
public void addPattern(String name, String pattern) {
patternDefinitions.put(name, pattern);
compiledPatterns.clear(); // Clear compiled patterns as definitions changed
}
public Map<String, Object> match(String pattern, String text) {
String resolvedPattern = resolvePattern(pattern);
Pattern compiled = compilePattern(resolvedPattern);
Matcher matcher = compiled.matcher(text);
if (matcher.matches()) {
return extractGroups(matcher);
}
return Collections.emptyMap();
}
public List<Map<String, Object>> matchAll(String pattern, String text) {
String resolvedPattern = resolvePattern(pattern);
Pattern compiled = compilePattern(resolvedPattern);
Matcher matcher = compiled.matcher(text);
List<Map<String, Object>> results = new ArrayList<>();
while (matcher.find()) {
results.add(extractGroups(matcher));
}
return results;
}
private String resolvePattern(String pattern) {
// Resolve nested patterns like %{PATTERN_NAME}
Pattern patternRef = Pattern.compile("%\\{([A-Z0-9_]+)(?::([^}]+))?\\}");
Matcher matcher = patternRef.matcher(pattern);
StringBuffer result = new StringBuffer();
while (matcher.find()) {
String patternName = matcher.group(1);
String semanticName = matcher.group(2);
String replacement = patternDefinitions.get(patternName);
if (replacement == null) {
throw new IllegalArgumentException("Unknown pattern: " + patternName);
}
// Recursively resolve nested patterns
replacement = resolvePattern(replacement);
if (semanticName != null) {
// Create named capturing group
matcher.appendReplacement(result, "(?<" + semanticName + ">" + replacement + ")");
} else {
matcher.appendReplacement(result, replacement);
}
}
matcher.appendTail(result);
return result.toString();
}
private Pattern compilePattern(String regex) {
return compiledPatterns.computeIfAbsent(regex,
k -> Pattern.compile(regex, Pattern.DOTALL | Pattern.MULTILINE));
}
private Map<String, Object> extractGroups(Matcher matcher) {
Map<String, Object> result = new LinkedHashMap<>();
// Add all named groups
for (String groupName : getGroupNames(matcher)) {
String value = matcher.group(groupName);
if (value != null) {
result.put(groupName, value);
}
}
// Add all numeric groups if no named groups found
if (result.isEmpty()) {
for (int i = 1; i <= matcher.groupCount(); i++) {
String value = matcher.group(i);
if (value != null) {
result.put("group" + i, value);
}
}
}
return result;
}
private Set<String> getGroupNames(Matcher matcher) {
Set<String> groupNames = new HashSet<>();
try {
// Use reflection to access named groups (Java 7+)
java.lang.reflect.Method namedGroupsMethod =
matcher.pattern().getClass().getDeclaredMethod("namedGroups");
namedGroupsMethod.setAccessible(true);
@SuppressWarnings("unchecked")
Map<String, Integer> namedGroups =
(Map<String, Integer>) namedGroupsMethod.invoke(matcher.pattern());
if (namedGroups != null) {
groupNames.addAll(namedGroups.keySet());
}
} catch (Exception e) {
// Fallback: try to extract group names from pattern
extractGroupNamesFromPattern(matcher.pattern().pattern(), groupNames);
}
return groupNames;
}
private void extractGroupNamesFromPattern(String pattern, Set<String> groupNames) {
// Simple extraction of named groups: (?<name>...)
Pattern namedGroupPattern = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>");
Matcher m = namedGroupPattern.matcher(pattern);
while (m.find()) {
groupNames.add(m.group(1));
}
}
}
2. Common Java Log Patterns
public class JavaLogPatterns {
// Pre-defined patterns for common Java logging frameworks
public static final Map<String, String> PATTERNS = Map.ofEntries(
// Logback pattern
Map.entry("LOGBACK_BASIC",
"^%{JAVA_TIMESTAMP} %{LOGLEVEL} %{LOGGER} - %{GREEDYDATA:message}$"),
// Log4j pattern
Map.entry("LOG4J_BASIC",
"^%{JAVA_TIMESTAMP} %{LOGLEVEL} \\[%{JAVA_THREAD:thread}\\] %{LOGGER} - %{GREEDYDATA:message}$"),
// Log4j with method name
Map.entry("LOG4J_DETAILED",
"^%{JAVA_TIMESTAMP} %{LOGLEVEL} \\[%{JAVA_THREAD:thread}\\] %{LOGGER}\\.%{JAVA_METHOD:method}\\(.*\\) - %{GREEDYDATA:message}$"),
// Spring Boot default pattern
Map.entry("SPRING_BOOT",
"^%{JAVA_TIMESTAMP} %{LOGLEVEL} %{NUMBER:pid} --- \\[%{JAVA_THREAD:thread}\\] %{LOGGER} : %{GREEDYDATA:message}$"),
// Tomcat access log
Map.entry("TOMCAT_ACCESS",
"^%{IP:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \\[%{HTTPDATE:timestamp}\\] \"%{HTTPMETHOD:method} %{URIPATH:request} %{HTTP_VERSION:http_version}\" %{HTTP_STATUS:status} %{NUMBER:bytes}$"),
// Java stack trace
Map.entry("JAVA_EXCEPTION_LINE",
"^%{JAVA_EXCEPTION:exception}: %{GREEDYDATA:exception_message}$"),
// Stack trace element
Map.entry("STACK_TRACE_ELEMENT",
"^\\s*at %{JAVA_CLASS:class}\\.%{JAVA_METHOD:method}\\(%{NOTSPACE:file}(?::%{NUMBER:line})?\\)$"),
// JSON log pattern
Map.entry("JSON_LOG",
"^\\{\"timestamp\":\"%{JAVA_TIMESTAMP:timestamp}\",\"level\":\"%{LOGLEVEL:level}\",\"logger\":\"%{LOGGER:logger}\",\"message\":\"%{GREEDYDATA:message}\".*\\}$"),
// HTTP request log
Map.entry("HTTP_REQUEST",
"^%{IP:client_ip} - - \\[%{HTTPDATE:timestamp}\\] \"%{HTTPMETHOD:method} %{URIPATH:url} %{HTTP_VERSION:http_version}\" %{HTTP_STATUS:status} %{NUMBER:response_size} \"%{NOTSPACE:referrer}\" \"%{USERAGENT:user_agent}\" %{NUMBER:response_time}$")
);
private final JavaGrok grok;
public JavaLogPatterns() {
this.grok = new JavaGrok(PATTERNS);
}
public JavaLogPatterns(JavaGrok grok) {
this.grok = grok;
}
public LogEvent parseLogback(String logLine) {
Map<String, Object> matches = grok.match("LOGBACK_BASIC", logLine);
return createLogEvent(matches);
}
public LogEvent parseLog4j(String logLine) {
Map<String, Object> matches = grok.match("LOG4J_BASIC", logLine);
return createLogEvent(matches);
}
public LogEvent parseSpringBoot(String logLine) {
Map<String, Object> matches = grok.match("SPRING_BOOT", logLine);
return createLogEvent(matches);
}
public HttpRequestLog parseTomcatAccess(String logLine) {
Map<String, Object> matches = grok.match("TOMCAT_ACCESS", logLine);
return createHttpRequestLog(matches);
}
public StackTraceElement parseStackTraceLine(String line) {
Map<String, Object> matches = grok.match("STACK_TRACE_ELEMENT", line);
return createStackTraceElement(matches);
}
public ExceptionInfo parseExceptionLine(String line) {
Map<String, Object> matches = grok.match("JAVA_EXCEPTION_LINE", line);
return createExceptionInfo(matches);
}
public Map<String, Object> parseJsonLog(String logLine) {
// For JSON logs, we might want to use a JSON parser instead
// This is a fallback for when JSON parsing fails
return grok.match("JSON_LOG", logLine);
}
private LogEvent createLogEvent(Map<String, Object> matches) {
if (matches.isEmpty()) {
return null;
}
return new LogEvent(
(String) matches.get("timestamp"),
(String) matches.get("level"),
(String) matches.get("logger"),
(String) matches.get("thread"),
(String) matches.get("message"),
(String) matches.get("method")
);
}
private HttpRequestLog createHttpRequestLog(Map<String, Object> matches) {
if (matches.isEmpty()) {
return null;
}
return new HttpRequestLog(
(String) matches.get("client_ip"),
(String) matches.get("timestamp"),
(String) matches.get("method"),
(String) matches.get("request"),
(String) matches.get("http_version"),
(String) matches.get("status"),
parseLong(matches.get("bytes")),
(String) matches.get("referrer"),
(String) matches.get("user_agent")
);
}
private StackTraceElement createStackTraceElement(Map<String, Object> matches) {
if (matches.isEmpty()) {
return null;
}
return new StackTraceElement(
(String) matches.get("class"),
(String) matches.get("method"),
(String) matches.get("file"),
parseInt(matches.get("line"))
);
}
private ExceptionInfo createExceptionInfo(Map<String, Object> matches) {
if (matches.isEmpty()) {
return null;
}
return new ExceptionInfo(
(String) matches.get("exception"),
(String) matches.get("exception_message")
);
}
private Long parseLong(Object value) {
if (value == null) return null;
try {
return Long.parseLong(value.toString());
} catch (NumberFormatException e) {
return null;
}
}
private Integer parseInt(Object value) {
if (value == null) return null;
try {
return Integer.parseInt(value.toString());
} catch (NumberFormatException e) {
return null;
}
}
// Data classes
public static class LogEvent {
private final String timestamp;
private final String level;
private final String logger;
private final String thread;
private final String message;
private final String method;
public LogEvent(String timestamp, String level, String logger,
String thread, String message, String method) {
this.timestamp = timestamp;
this.level = level;
this.logger = logger;
this.thread = thread;
this.message = message;
this.method = method;
}
// Getters
public String getTimestamp() { return timestamp; }
public String getLevel() { return level; }
public String getLogger() { return logger; }
public String getThread() { return thread; }
public String getMessage() { return message; }
public String getMethod() { return method; }
}
public static class HttpRequestLog {
private final String clientIp;
private final String timestamp;
private final String method;
private final String request;
private final String httpVersion;
private final String status;
private final Long bytes;
private final String referrer;
private final String userAgent;
public HttpRequestLog(String clientIp, String timestamp, String method,
String request, String httpVersion, String status,
Long bytes, String referrer, String userAgent) {
this.clientIp = clientIp;
this.timestamp = timestamp;
this.method = method;
this.request = request;
this.httpVersion = httpVersion;
this.status = status;
this.bytes = bytes;
this.referrer = referrer;
this.userAgent = userAgent;
}
// Getters
public String getClientIp() { return clientIp; }
public String getTimestamp() { return timestamp; }
public String getMethod() { return method; }
public String getRequest() { return request; }
public String getHttpVersion() { return httpVersion; }
public String getStatus() { return status; }
public Long getBytes() { return bytes; }
public String getReferrer() { return referrer; }
public String getUserAgent() { return userAgent; }
}
public static class ExceptionInfo {
private final String exceptionClass;
private final String message;
public ExceptionInfo(String exceptionClass, String message) {
this.exceptionClass = exceptionClass;
this.message = message;
}
// Getters
public String getExceptionClass() { return exceptionClass; }
public String getMessage() { return message; }
}
}
3. Log Processor with Multiple Pattern Support
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public class LogProcessor {
private final JavaGrok grok;
private final List<LogPattern> patterns;
private final Map<String, LogPattern> patternCache;
public LogProcessor() {
this.grok = new JavaGrok();
this.patterns = new ArrayList<>();
this.patternCache = new ConcurrentHashMap<>();
initializeDefaultPatterns();
}
private void initializeDefaultPatterns() {
// Add common log patterns with their conditions
patterns.add(new LogPattern("LOGBACK_BASIC",
"timestamp level logger message",
line -> line.matches("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.*")));
patterns.add(new LogPattern("LOG4J_BASIC",
"timestamp level thread logger message",
line -> line.matches("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.*\\[.*\\].*")));
patterns.add(new LogPattern("SPRING_BOOT",
"timestamp level pid thread logger message",
line -> line.matches("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.*--- \\[.*\\].*")));
patterns.add(new LogPattern("TOMCAT_ACCESS",
"client_ip ident auth timestamp method request http_version status bytes",
line -> line.matches("^\\d+\\.\\d+\\.\\d+\\.\\d+.*\\[.*\\].*HTTP/.*")));
patterns.add(new LogPattern("JAVA_EXCEPTION",
"exception exception_message",
line -> line.matches("^[a-zA-Z0-9.]*Exception:.*")));
patterns.add(new LogPattern("STACK_TRACE",
"class method file line",
line -> line.matches("^\\s*at [a-zA-Z0-9.]+\\.*")));
}
public ProcessedLog processLine(String logLine) {
// Try cached pattern first
LogPattern cachedPattern = patternCache.get(logLine);
if (cachedPattern != null) {
Map<String, Object> matches = grok.match(cachedPattern.getPatternName(), logLine);
return new ProcessedLog(logLine, cachedPattern.getPatternName(), matches);
}
// Find matching pattern
for (LogPattern pattern : patterns) {
if (pattern.getCondition().test(logLine)) {
Map<String, Object> matches = grok.match(pattern.getPatternName(), logLine);
// Cache successful pattern for this line format
if (!matches.isEmpty()) {
patternCache.putIfAbsent(logLine, pattern);
return new ProcessedLog(logLine, pattern.getPatternName(), matches);
}
}
}
// No pattern matched
return new ProcessedLog(logLine, "UNKNOWN", Collections.singletonMap("raw_message", logLine));
}
public List<ProcessedLog> processLines(List<String> logLines) {
return logLines.stream()
.map(this::processLine)
.collect(Collectors.toList());
}
public void addPattern(LogPattern pattern) {
patterns.add(pattern);
patternCache.clear(); // Clear cache as patterns changed
}
public void addPattern(String name, String pattern, Function<String, Boolean> condition) {
addPattern(new LogPattern(name, pattern, condition));
}
public Map<String, Long> analyzeLogLevels(List<ProcessedLog> logs) {
return logs.stream()
.filter(log -> log.getFields().containsKey("level"))
.collect(Collectors.groupingBy(
log -> log.getFields().get("level").toString(),
Collectors.counting()
));
}
public Map<String, Long> analyzeLoggers(List<ProcessedLog> logs) {
return logs.stream()
.filter(log -> log.getFields().containsKey("logger"))
.collect(Collectors.groupingBy(
log -> log.getFields().get("logger").toString(),
Collectors.counting()
));
}
public List<ProcessedLog> findErrors(List<ProcessedLog> logs) {
return logs.stream()
.filter(log -> {
Object level = log.getFields().get("level");
return level != null &&
("ERROR".equals(level) || "FATAL".equals(level));
})
.collect(Collectors.toList());
}
public List<ProcessedLog> filterByLogger(List<ProcessedLog> logs, String loggerPattern) {
Pattern pattern = Pattern.compile(loggerPattern);
return logs.stream()
.filter(log -> {
Object logger = log.getFields().get("logger");
return logger != null && pattern.matcher(logger.toString()).matches();
})
.collect(Collectors.toList());
}
// Pattern definition class
public static class LogPattern {
private final String patternName;
private final String description;
private final Function<String, Boolean> condition;
public LogPattern(String patternName, String description,
Function<String, Boolean> condition) {
this.patternName = patternName;
this.description = description;
this.condition = condition;
}
// Getters
public String getPatternName() { return patternName; }
public String getDescription() { return description; }
public Function<String, Boolean> getCondition() { return condition; }
}
// Processed log result
public static class ProcessedLog {
private final String originalLine;
private final String patternName;
private final Map<String, Object> fields;
private final long timestamp;
public ProcessedLog(String originalLine, String patternName,
Map<String, Object> fields) {
this.originalLine = originalLine;
this.patternName = patternName;
this.fields = Collections.unmodifiableMap(fields);
this.timestamp = System.currentTimeMillis();
}
// Getters
public String getOriginalLine() { return originalLine; }
public String getPatternName() { return patternName; }
public Map<String, Object> getFields() { return fields; }
public long getTimestamp() { return timestamp; }
public Object getField(String name) {
return fields.get(name);
}
public boolean hasField(String name) {
return fields.containsKey(name);
}
}
}
4. Spring Boot Integration
import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Value;
import javax.annotation.PostConstruct;
import java.io.*;
import java.nio.file.*;
import java.util.stream.Collectors;
@Component
public class LogstashService {
private final LogProcessor logProcessor;
private final JavaLogPatterns javaPatterns;
@Value("${app.log.patterns.file:patterns.conf}")
private String patternsFile;
@Value("${app.log.process.batch-size:1000}")
private int batchSize;
public LogstashService() {
this.logProcessor = new LogProcessor();
this.javaPatterns = new JavaLogPatterns();
}
@PostConstruct
public void init() {
loadCustomPatterns();
}
private void loadCustomPatterns() {
try {
Path path = Paths.get(patternsFile);
if (Files.exists(path)) {
List<String> lines = Files.readAllLines(path);
parseCustomPatterns(lines);
}
} catch (IOException e) {
System.err.println("Failed to load custom patterns: " + e.getMessage());
}
}
private void parseCustomPatterns(List<String> lines) {
for (String line : lines) {
line = line.trim();
if (line.isEmpty() || line.startsWith("#")) {
continue;
}
// Parse pattern definition: PATTERN_NAME regex
String[] parts = line.split("\\s+", 2);
if (parts.length == 2) {
String patternName = parts[0];
String pattern = parts[1];
logProcessor.addPattern(patternName, pattern,
line -> line.matches(".*" + pattern + ".*")); // Simple condition
}
}
}
public List<LogProcessor.ProcessedLog> processLogFile(String filePath) throws IOException {
List<String> lines = Files.readAllLines(Paths.get(filePath));
return logProcessor.processLines(lines);
}
public void processLogFileStreaming(String filePath,
Consumer<LogProcessor.ProcessedLog> consumer)
throws IOException {
try (BufferedReader reader = Files.newBufferedReader(Paths.get(filePath))) {
String line;
while ((line = reader.readLine()) != null) {
LogProcessor.ProcessedLog processed = logProcessor.processLine(line);
consumer.accept(processed);
}
}
}
public Map<String, Object> analyzeLogFile(String filePath) throws IOException {
List<LogProcessor.ProcessedLog> logs = processLogFile(filePath);
Map<String, Long> levelCounts = logProcessor.analyzeLogLevels(logs);
Map<String, Long> loggerCounts = logProcessor.analyzeLoggers(logs);
List<LogProcessor.ProcessedLog> errors = logProcessor.findErrors(logs);
return Map.of(
"total_logs", logs.size(),
"level_distribution", levelCounts,
"logger_distribution", loggerCounts,
"error_count", errors.size(),
"errors", errors.stream()
.map(LogProcessor.ProcessedLog::getFields)
.collect(Collectors.toList())
);
}
public void exportToJson(List<LogProcessor.ProcessedLog> logs, String outputPath)
throws IOException {
List<Map<String, Object>> jsonLogs = logs.stream()
.map(log -> {
Map<String, Object> jsonLog = new HashMap<>();
jsonLog.put("original_line", log.getOriginalLine());
jsonLog.put("pattern", log.getPatternName());
jsonLog.put("timestamp", log.getTimestamp());
jsonLog.putAll(log.getFields());
return jsonLog;
})
.collect(Collectors.toList());
// Using Jackson or similar library for JSON serialization
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(new File(outputPath), jsonLogs);
}
// Real-time log processing
public void watchLogFile(String filePath,
Consumer<LogProcessor.ProcessedLog> processor)
throws IOException, InterruptedException {
WatchService watchService = FileSystems.getDefault().newWatchService();
Path path = Paths.get(filePath).getParent();
path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
long lastPosition = Files.size(Paths.get(filePath));
while (true) {
WatchKey key = watchService.take();
for (WatchEvent<?> event : key.pollEvents()) {
if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
processNewLines(filePath, lastPosition, processor);
lastPosition = Files.size(Paths.get(filePath));
}
}
key.reset();
}
}
private void processNewLines(String filePath, long lastPosition,
Consumer<LogProcessor.ProcessedLog> processor)
throws IOException {
try (RandomAccessFile file = new RandomAccessFile(filePath, "r")) {
file.seek(lastPosition);
String line;
while ((line = file.readLine()) != null) {
LogProcessor.ProcessedLog processed = logProcessor.processLine(line);
processor.accept(processed);
}
}
}
}
5. Advanced Pattern Matching with Custom Rules
public class AdvancedGrokProcessor {
private final JavaGrok grok;
private final List<MatchingRule> rules;
public AdvancedGrokProcessor() {
this.grok = new JavaGrok();
this.rules = new ArrayList<>();
initializeRules();
}
private void initializeRules() {
// Rule for detecting high response times
rules.add(new MatchingRule(
"HIGH_RESPONSE_TIME",
"Response time exceeds threshold",
fields -> {
Object responseTime = fields.get("response_time");
if (responseTime instanceof String) {
try {
long rt = Long.parseLong((String) responseTime);
return rt > 1000; // 1 second threshold
} catch (NumberFormatException e) {
return false;
}
}
return false;
},
fields -> Map.of("alert", "HIGH_RESPONSE_TIME",
"response_time", fields.get("response_time"))
));
// Rule for detecting error patterns
rules.add(new MatchingRule(
"ERROR_PATTERN",
"Multiple errors from same source",
fields -> {
Object level = fields.get("level");
Object logger = fields.get("logger");
return "ERROR".equals(level) &&
logger != null &&
logger.toString().contains("Database");
},
fields -> Map.of("alert", "DATABASE_ERROR",
"logger", fields.get("logger"))
));
// Rule for suspicious activity
rules.add(new MatchingRule(
"SUSPICIOUS_ACTIVITY",
"Multiple failed login attempts",
fields -> {
Object message = fields.get("message");
return message != null &&
message.toString().toLowerCase().contains("failed login");
},
fields -> Map.of("alert", "FAILED_LOGIN_ATTEMPTS")
));
}
public EnrichedLog processAndEnrich(String logLine) {
// First, parse with standard patterns
LogProcessor.ProcessedLog processed = new LogProcessor().processLine(logLine);
// Then apply enrichment rules
List<Map<String, Object>> enrichments = new ArrayList<>();
for (MatchingRule rule : rules) {
if (rule.getCondition().test(processed.getFields())) {
enrichments.add(rule.getEnrichment().apply(processed.getFields()));
}
}
return new EnrichedLog(processed, enrichments);
}
public List<EnrichedLog> processWithRules(List<String> logLines) {
return logLines.stream()
.map(this::processAndEnrich)
.collect(Collectors.toList());
}
public void addRule(MatchingRule rule) {
rules.add(rule);
}
// Rule definition
public static class MatchingRule {
private final String name;
private final String description;
private final Function<Map<String, Object>, Boolean> condition;
private final Function<Map<String, Object>, Map<String, Object>> enrichment;
public MatchingRule(String name, String description,
Function<Map<String, Object>, Boolean> condition,
Function<Map<String, Object>, Map<String, Object>> enrichment) {
this.name = name;
this.description = description;
this.condition = condition;
this.enrichment = enrichment;
}
// Getters
public String getName() { return name; }
public String getDescription() { return description; }
public Function<Map<String, Object>, Boolean> getCondition() { return condition; }
public Function<Map<String, Object>, Map<String, Object>> getEnrichment() { return enrichment; }
}
// Enriched log result
public static class EnrichedLog {
private final LogProcessor.ProcessedLog processedLog;
private final List<Map<String, Object>> enrichments;
public EnrichedLog(LogProcessor.ProcessedLog processedLog,
List<Map<String, Object>> enrichments) {
this.processedLog = processedLog;
this.enrichments = Collections.unmodifiableList(enrichments);
}
// Getters
public LogProcessor.ProcessedLog getProcessedLog() { return processedLog; }
public List<Map<String, Object>> getEnrichments() { return enrichments; }
public boolean hasEnrichments() { return !enrichments.isEmpty(); }
}
}
6. Performance-Optimized Batch Processing
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class BatchLogProcessor {
private final LogProcessor logProcessor;
private final ExecutorService executor;
private final BlockingQueue<String> logQueue;
private final AtomicLong processedCount;
private final AtomicLong errorCount;
public BatchLogProcessor(int queueSize, int threadCount) {
this.logProcessor = new LogProcessor();
this.executor = Executors.newFixedThreadPool(threadCount);
this.logQueue = new LinkedBlockingQueue<>(queueSize);
this.processedCount = new AtomicLong(0);
this.errorCount = new AtomicLong(0);
}
public void startProcessing(Consumer<LogProcessor.ProcessedLog> resultHandler) {
for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
String logLine = logQueue.poll(100, TimeUnit.MILLISECONDS);
if (logLine != null) {
processLogLine(logLine, resultHandler);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
}
public void submitLogLine(String logLine) {
try {
logQueue.put(logLine);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void submitLogLines(List<String> logLines) {
for (String line : logLines) {
submitLogLine(line);
}
}
private void processLogLine(String logLine, Consumer<LogProcessor.ProcessedLog> resultHandler) {
try {
LogProcessor.ProcessedLog processed = logProcessor.processLine(logLine);
processedCount.incrementAndGet();
resultHandler.accept(processed);
} catch (Exception e) {
errorCount.incrementAndGet();
System.err.println("Failed to process log line: " + logLine);
}
}
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
public ProcessingStats getStats() {
return new ProcessingStats(
processedCount.get(),
errorCount.get(),
logQueue.size()
);
}
public static class ProcessingStats {
private final long processedCount;
private final long errorCount;
private final long queueSize;
public ProcessingStats(long processedCount, long errorCount, long queueSize) {
this.processedCount = processedCount;
this.errorCount = errorCount;
this.queueSize = queueSize;
}
// Getters
public long getProcessedCount() { return processedCount; }
public long getErrorCount() { return errorCount; }
public long getQueueSize() { return queueSize; }
public double getErrorRate() {
return processedCount > 0 ? (double) errorCount / processedCount * 100 : 0.0;
}
}
}
Usage Examples
public class GrokExamples {
public static void main(String[] args) throws Exception {
// Basic usage
JavaGrok grok = new JavaGrok();
String logLine = "2024-01-15 10:30:45 INFO com.example.Service - Processing request";
Map<String, Object> result = grok.match("LOGBACK_BASIC", logLine);
System.out.println("Parsed fields: " + result);
// Using pattern processor
LogProcessor processor = new LogProcessor();
LogProcessor.ProcessedLog processed = processor.processLine(logLine);
System.out.println("Pattern: " + processed.getPatternName());
System.out.println("Fields: " + processed.getFields());
// Batch processing
List<String> logLines = Arrays.asList(
"2024-01-15 10:30:45 INFO com.example.Service - Processing request",
"2024-01-15 10:30:46 ERROR com.example.Service - Database connection failed",
"127.0.0.1 - - [15/Jan/2024:10:30:47 +0000] \"GET /api/users HTTP/1.1\" 200 1234"
);
List<LogProcessor.ProcessedLog> results = processor.processLines(logLines);
for (LogProcessor.ProcessedLog log : results) {
System.out.println(log.getPatternName() + ": " + log.getFields());
}
// Advanced processing with rules
AdvancedGrokProcessor advancedProcessor = new AdvancedGrokProcessor();
EnrichedLog enriched = advancedProcessor.processAndEnrich(logLine);
if (enriched.hasEnrichments()) {
System.out.println("Enrichments: " + enriched.getEnrichments());
}
}
}
This implementation provides a comprehensive Grok-like pattern matching system in Java, suitable for parsing Java application logs with support for custom patterns, batch processing, and real-time analysis.