Logstash Parsing for Java Logs in Java

Overview

Logstash is a powerful data processing pipeline that can parse, transform, and ship Java application logs. Here's comprehensive coverage of Logstash configurations for Java logs.

Basic Logstash Configuration

1. Simple Java Log Parsing

// Example Java application with structured logging
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
public class JavaApplication {
private static final Logger logger = LoggerFactory.getLogger(JavaApplication.class);
public void processOrder(String orderId, double amount) {
String transactionId = UUID.randomUUID().toString();
// Structured logging with MDC
org.slf4j.MDC.put("transactionId", transactionId);
org.slf4j.MDC.put("userId", "user123");
logger.info("Processing order: orderId={}, amount={}", orderId, amount);
try {
// Business logic
validateOrder(orderId, amount);
processPayment(orderId, amount);
logger.info("Order processed successfully");
} catch (Exception e) {
logger.error("Failed to process order: orderId={}, error={}", orderId, e.getMessage(), e);
} finally {
org.slf4DC.clear();
}
}
private void validateOrder(String orderId, double amount) {
if (amount <= 0) {
throw new IllegalArgumentException("Invalid amount: " + amount);
}
logger.debug("Order validated: orderId={}", orderId);
}
private void processPayment(String orderId, double amount) {
// Simulate payment processing
logger.info("Processing payment: orderId={}, amount={}", orderId, amount);
// Payment logic here
}
}

Basic Logstash Configuration:

# logstash-simple.conf
input {
file {
path => "/var/log/java-app/app.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "json" # If logs are in JSON format
}
}
filter {
# Parse timestamp if not in JSON
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}" }
}
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
# Add custom fields
mutate {
add_field => {
"application" => "java-order-service"
"environment" => "production"
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "java-logs-%{+YYYY.MM.dd}"
}
stdout { 
codec => rubydebug 
}
}

2. Advanced Multi-line Log Parsing

// Java application with stack traces
public class ErrorProneApplication {
private static final Logger logger = LoggerFactory.getLogger(ErrorProneApplication.class);
public void performComplexOperation() {
try {
riskyOperation();
anotherRiskyOperation();
} catch (BusinessException e) {
logger.warn("Business exception occurred", e);
} catch (Exception e) {
logger.error("Unexpected error during complex operation", e);
// This will generate multi-line stack traces
}
}
private void riskyOperation() throws BusinessException {
if (Math.random() > 0.7) {
throw new BusinessException("Random business failure");
}
}
private void anotherRiskyOperation() {
if (Math.random() > 0.8) {
throw new RuntimeException("Technical failure");
}
}
}
class BusinessException extends Exception {
public BusinessException(String message) {
super(message);
}
}

Multi-line Logstash Configuration:

# logstash-multiline.conf
input {
file {
path => "/var/log/java-app/*.log"
type => "java"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601} "
negate => true
what => "previous"
auto_flush_interval => 5
}
}
}
filter {
# Parse the complete multi-line event
grok {
match => { 
"message" => "^%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:loglevel} \[%{DATA:thread}\] %{DATA:class} - %{GREEDYDATA:logmessage}" 
}
}
# Extract stack traces separately
if [logmessage] =~ /(?m)^\s+at\s/ {
grok {
match => { 
"logmessage" => "(?<error_message>.*?)\n(?<stack_trace>(?m:.*))" 
}
}
}
# Parse date
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
remove_field => [ "timestamp" ]
}
# Parse exception type
grok {
match => { 
"error_message" => "%{WORD:exception}: %{GREEDYDATA:exception_message}" 
}
}
# Add severity numeric value
mutate {
update => { 
"severity" => "%{loglevel}" 
}
}
# Convert loglevel to uppercase for consistency
mutate {
uppercase => [ "loglevel" ]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "java-error-logs-%{+YYYY.MM.dd}"
document_type => "java_log"
}
}

Structured JSON Log Parsing

1. JSON Log Configuration in Java

// Logback configuration for JSON logs
// src/main/resources/logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="JSON" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{"application":"order-service","environment":"production"}</customFields>
<includeContext>true</includeContext>
<includeMdc>true</includeMdc>
<includeCallerData>true</includeCallerData>
<fieldNames>
<timestamp>timestamp</timestamp>
<message>message</message>
<logger>logger</logger>
<level>level</level>
<thread>thread</thread>
<stackTrace>stack_trace</stackTrace>
<mdc>mdc</mdc>
</fieldNames>
</encoder>
</appender>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/var/log/java-app/application.json</file>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<fieldName>timestamp</fieldName>
<timeZone>UTC</timeZone>
</timestamp>
<logLevel>
<fieldName>level</fieldName>
</logLevel>
<loggerName>
<fieldName>logger</fieldName>
</loggerName>
<message>
<fieldName>message</fieldName>
</message>
<mdc/>
<stackTrace>
<fieldName>stack_trace</fieldName>
</stackTrace>
<threadName>
<fieldName>thread</fieldName>
</threadName>
<pattern>
<pattern>
{
"application": "order-service",
"environment": "production"
}
</pattern>
</pattern>
</providers>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/var/log/java-app/application.%d{yyyy-MM-dd}.json</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
</appender>
<root level="INFO">
<appender-ref ref="JSON" />
<appender-ref ref="FILE" />
</root>
</configuration>

Logstash Configuration for JSON Logs:

# logstash-json.conf
input {
file {
path => "/var/log/java-app/application*.json"
codec => "json"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
# JSON parsing is automatic with json codec
# Clean up fields
mutate {
remove_field => [ "host", "@version" ]
rename => {
"level" => "loglevel"
"logger" => "class_name"
}
}
# Parse timestamp from JSON log
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
# Extract business-specific fields from message
if [message] =~ /orderId=/ {
grok {
match => { 
"message" => "orderId=%{NOTSPACE:orderId}, amount=%{NUMBER:amount:float}" 
}
}
}
# Parse MDC fields
if [mdc] {
mutate {
add_field => { 
"transactionId" => "%{[mdc][transactionId]}"
"userId" => "%{[mdc][userId]}"
}
}
}
# Add error classification
if [loglevel] == "ERROR" {
mutate {
add_field => { "error_type" => "application_error" }
}
}
# GeoIP for client IP (if available in MDC)
if [mdc][clientIp] {
geoip {
source => "[mdc][clientIp]"
target => "geoip"
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "java-json-logs-%{+YYYY.MM.dd}"
document_id => "%{[@metadata][fingerprint]}"
}
# Send errors to separate index
if [loglevel] == "ERROR" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "java-errors-%{+YYYY.MM.dd}"
}
}
}

Advanced Parsing Scenarios

1. Custom Log Patterns with Grok

// Custom formatted logs
public class CustomLogger {
private static final Logger logger = LoggerFactory.getLogger(CustomLogger.class);
public void logBusinessEvent(String eventType, String entityId, String action, long duration) {
// Custom log format: EVENT|timestamp|eventType|entityId|action|duration|userId
String userId = org.slf4j.MDC.get("userId");
logger.info("EVENT|{}|{}|{}|{}|{}|{}", 
System.currentTimeMillis(),
eventType, 
entityId, 
action, 
duration, 
userId != null ? userId : "anonymous");
}
public void logPerformanceMetric(String metricName, double value, Map<String, String> tags) {
// METRIC|timestamp|metricName|value|tag1=value1,tag2=value2
String tagString = tags.entrySet().stream()
.map(entry -> entry.getKey() + "=" + entry.getValue())
.collect(Collectors.joining(","));
logger.info("METRIC|{}|{}|{}|{}", 
System.currentTimeMillis(),
metricName,
value,
tagString);
}
}

Custom Grok Patterns:

# logstash-custom.conf
input {
file {
path => "/var/log/java-app/business.log"
start_position => "beginning"
}
}
filter {
# Parse custom business events
if [message] =~ /^EVENT\|/ {
grok {
match => { 
"message" => "EVENT\|%{NUMBER:event_timestamp}\|%{WORD:event_type}\|%{NOTSPACE:entity_id}\|%{WORD:action}\|%{NUMBER:duration:long}\|%{NOTSPACE:user_id}" 
}
}
date {
match => [ "event_timestamp", "UNIX_MS" ]
target => "@timestamp"
}
mutate {
remove_field => [ "event_timestamp" ]
add_tag => [ "business_event" ]
}
}
# Parse custom metrics
if [message] =~ /^METRIC\|/ {
grok {
match => { 
"message" => "METRIC\|%{NUMBER:metric_timestamp}\|%{WORD:metric_name}\|%{NUMBER:metric_value:float}\|%{GREEDYDATA:metric_tags}" 
}
}
# Parse tags into individual fields
if [metric_tags] {
kv {
source => "metric_tags"
field_split => ","
value_split => "="
target => "tags"
}
}
date {
match => [ "metric_timestamp", "UNIX_MS" ]
target => "@timestamp"
}
mutate {
remove_field => [ "metric_timestamp", "metric_tags" ]
add_tag => [ "performance_metric" ]
}
}
# Drop non-matching messages
if "business_event" not in [tags] and "performance_metric" not in [tags] {
drop { }
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "business-events-%{+YYYY.MM.dd}"
}
}

2. Log Enrichment and Correlation

// Application with distributed tracing
public class DistributedService {
private static final Logger logger = LoggerFactory.getLogger(DistributedService.class);
public void handleRequest(HttpRequest request) {
// Extract tracing headers
String traceId = request.getHeader("X-Trace-ID");
String spanId = request.getHeader("X-Span-ID");
String parentSpanId = request.getHeader("X-Parent-Span-ID");
// Set in MDC for logging
org.slf4j.MDC.put("traceId", traceId);
org.slf4j.MDC.put("spanId", spanId);
org.slf4j.MDC.put("parentSpanId", parentSpanId);
org.slf4j.MDC.put("service", "order-service");
logger.info("Request started: method={}, path={}", 
request.getMethod(), request.getPath());
try {
processRequest(request);
logger.info("Request completed successfully");
} catch (Exception e) {
logger.error("Request failed", e);
throw e;
} finally {
org.slf4j.MDC.clear();
}
}
private void processRequest(HttpRequest request) {
// Business logic that might call other services
logger.debug("Processing request");
// Simulate external service call
callInventoryService();
callPaymentService();
}
private void callInventoryService() {
logger.info("Calling inventory service");
// HTTP call with tracing headers
}
private void callPaymentService() {
logger.info("Calling payment service");
// HTTP call with tracing headers
}
}

Enrichment Configuration:

# logstash-enrichment.conf
input {
file {
path => "/var/log/java-app/distributed.log"
codec => "json"
}
# Also read from other services
file {
path => "/var/log/inventory-service/*.log"
codec => "json"
type => "inventory"
}
file {
path => "/var/log/payment-service/*.log"
codec => "json"
type => "payment"
}
}
filter {
# Common processing for all logs
mutate {
add_field => {
"[@metadata][received_at]" => "%{@timestamp}"
}
}
# Extract trace information
if [mdc] {
mutate {
add_field => {
"traceId" => "%{[mdc][traceId]}"
"spanId" => "%{[mdc][spanId]}"
"parentSpanId" => "%{[mdc][parentSpanId]}"
"service" => "%{[mdc][service]}"
}
}
}
# Add geo information for client IP
if [mdc][clientIp] {
geoip {
source => "[mdc][clientIp]"
target => "geoip"
}
}
# Lookup service information
if [service] {
translate {
field => "service"
destination => "service_info"
dictionary => {
"order-service" => '{"team":"orders","sla":"99.9","criticality":"high"}'
"inventory-service" => '{"team":"inventory","sla":"99.5","criticality":"medium"}'
"payment-service" => '{"team":"payments","sla":"99.99","criticality":"high"}'
}
}
# Parse service info JSON
if [service_info] {
json {
source => "service_info"
target => "service_metadata"
}
}
}
# Calculate request duration for completed requests
if [message] =~ /Request completed/ and [traceId] {
metrics {
meter => [ "request_duration", "%{traceId}" ]
add_tag => "metric"
clear_interval => 10
}
}
}
output {
# Correlated logs by traceId
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "correlated-logs-%{+YYYY.MM.dd}"
document_id => "%{traceId}-%{spanId}"
}
# Metrics output
if "metric" in [tags] {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "service-metrics-%{+YYYY.MM.dd}"
}
}
}

Performance and Monitoring

1. High-Volume Log Processing

# logstash-performance.conf
input {
file {
path => "/var/log/java-app/high-volume.log"
codec => "json"
# Performance tuning
sincedb_write_interval => 15
stat_interval => 1
discover_interval => 15
max_open_files => 1000
file_completed_action => "delete"
}
}
filter {
# Efficient grok patterns
grok {
break_on_match => false
match => {
"message" => [
"orderId=%{NOTSPACE:orderId}",
"userId=%{NOTSPACE:userId}",
"duration=%{NUMBER:duration:float}"
]
}
tag_on_failure => [] # Don't tag failures to avoid overhead
}
# Use dissect for better performance than grok
if [type] == "performance" {
dissect {
mapping => {
"message" => "PERF|%{timestamp}|%{operation}|%{duration_ms}|%{result}"
}
convert_datatype => {
duration_ms => "int"
timestamp => "int"
}
}
}
# Drop debug logs in production
if [loglevel] == "DEBUG" and [environment] == "production" {
drop { }
}
# Sample logs if volume is too high (keep 10%)
if [loglevel] == "INFO" {
prune {
whitelist_names => [ "^@timestamp", "^loglevel", "^message", "error_.*" ]
percentage => 10
}
}
}
output {
# Batch writes for performance
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "java-logs-%{+YYYY.MM.dd}"
flush_size => 1000
idle_flush_time => 5
retry_initial_interval => 1
retry_max_interval => 60
}
# Dead letter queue for failures
if "_grokparsefailure" in [tags] or "_dateparsefailure" in [tags] {
file {
path => "/var/log/logstash/dead_letter_queue/%{+YYYY-MM-dd}.log"
codec => line { format => "%{message}" }
}
}
}

2. Monitoring Logstash Performance

# logstash-monitoring.conf
input {
# Internal Logstash metrics
http_poller {
urls => {
logstash_metrics => "http://localhost:9600/_node/stats"
}
request_timeout => 10
interval => 30
codec => "json"
}
}
filter {
# Extract relevant metrics
mutate {
add_field => {
"host" => "%{[host]}"
"timestamp" => "%{[@timestamp]}"
}
}
# Pipeline metrics
if [pipeline] {
mutate {
add_field => {
"events_in" => "%{[pipeline][events][in]}"
"events_out" => "%{[pipeline][events][out]}"
"events_filtered" => "%{[pipeline][events][filtered]}"
"events_duration" => "%{[pipeline][events][duration_in_millis]}"
}
}
}
# Plugin metrics
if [pipeline][plugins] {
# Extract filter plugin metrics
if [pipeline][plugins][filters] {
ruby {
code => '
filters = event.get("[pipeline][plugins][filters]")
if filters
filters.each do |filter|
if filter["name"] == "grok"
event.set("grok_events_duration", filter["events"]["duration_in_millis"])
event.set("grok_matches_ok", filter["matches"])
end
end
end
'
}
}
}
# Calculate throughput
metrics {
meter => "events_throughput"
add_tag => "logstash_metrics"
clear_interval => 60
}
}
output {
# Send metrics to monitoring system
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logstash-metrics-%{+YYYY.MM.dd}"
}
# Alert on high error rates
if [grok_matches_ok] and [grok_matches_ok] < 0.9 {
email {
to => "[email protected]"
subject => "Logstash Grok Parse Failure Rate High"
body => "Grok parse success rate: %{grok_matches_ok}"
}
}
}

Security and Compliance

1. Sensitive Data Handling

# logstash-security.conf
input {
file {
path => "/var/log/java-app/secure.log"
codec => "json"
}
}
filter {
# Remove sensitive fields
mutate {
remove_field => [ 
"password", 
"credit_card", 
"ssn", 
"auth_token",
"api_key"
]
}
# Hash user identifiers for privacy
if [userId] {
fingerprint {
source => "userId"
target => "user_hash"
method => "SHA256"
key => "secret-salt-key"
base64encode => true
}
mutate {
remove_field => "userId"
}
}
# Mask email addresses in messages
mutate {
gsub => [
"message", "@[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", "***@***.***",
"message", "\b\d{3}-\d{2}-\d{4}\b", "***-**-****"
]
}
# Classify log sensitivity
if [message] =~ /(password|credit_card|ssn)/i {
mutate {
add_field => { "sensitivity" => "high" }
add_tag => [ "pii" ]
}
}
}
output {
# Route sensitive logs to secure storage
if "pii" in [tags] {
elasticsearch {
hosts => ["secure-elasticsearch:9200"]
index => "secure-logs-%{+YYYY.MM.dd}"
user => "secure_user"
password => "${ES_SECURE_PASSWORD}"
ssl => true
ssl_certificate_verification => true
}
} else {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "normal-logs-%{+YYYY.MM.dd}"
}
}
}

Docker and Kubernetes Integration

1. Containerized Logstash

# Dockerfile
FROM docker.elastic.co/logstash/logstash:8.5.0
# Remove default configuration
RUN rm -f /usr/share/logstash/pipeline/logstash.conf
# Add custom configurations
COPY pipeline/ /usr/share/logstash/pipeline/
COPY config/ /usr/share/logstash/config/
# Add custom patterns
COPY patterns/ /usr/share/logstash/patterns/
# Install plugins
RUN logstash-plugin install logstash-filter-translate
RUN logstash-plugin install logstash-filter-fingerprint
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD curl -f http://localhost:9600/ || exit 1
CMD ["-f", "/usr/share/logstash/pipeline/"]

Kubernetes Configuration:

# kubernetes/logstash-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-config
data:
logstash.yml: |
http.host: "0.0.0.0"
config.support_escapes: true
pipeline.workers: 2
pipeline.batch.size: 125
logstash.conf: |
input {
beats {
port => 5044
host => "0.0.0.0"
}
}
filter {
# Parse Java JSON logs
if [fields][app] == "java-app" {
json {
source => "message"
}
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
mutate {
remove_field => [ "timestamp" ]
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "java-logs-%{+YYYY.MM.dd}"
}
}

2. Filebeat to Logstash

# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/java-app/*.log
fields:
app: java-app
environment: production
fields_under_root: true
json.keys_under_root: true
json.add_error_key: true
output.logstash:
hosts: ["logstash:5044"]
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~

Troubleshooting Common Issues

1. Debug Configuration

# logstash-debug.conf
input {
generator {
message => '{"timestamp":"2023-01-01T12:00:00.000Z","level":"INFO","message":"Test message","orderId":"123"}'
count => 1
}
}
filter {
# Debug: log event at each step
stdout {
codec => rubydebug {
metadata => true
}
}
# Test grok patterns
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}" }
tag_on_failure => ["_grokparsefailure"]
}
# Debug: log after grok
stdout {
codec => rubydebug {
metadata => true
}
}
}
output {
file {
path => "/tmp/logstash-debug.log"
codec => rubydebug
}
}

2. Common Grok Patterns for Java Logs

# patterns/java-log-patterns
JAVA_TIMESTAMP %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:%{MINUTE}:%{SECOND}%{ISO8601_TIMEZONE}?
JAVA_CLASS (?:[a-zA-Z0-9-]+\.)+[A-Za-z0-9$]+
JAVA_THREAD \[[^\]]+\]
# Usage in Logstash
filter {
grok {
patterns_dir => ["/usr/share/logstash/patterns"]
match => { 
"message" => "%{JAVA_TIMESTAMP:timestamp} %{LOGLEVEL:loglevel} %{JAVA_THREAD:thread} %{JAVA_CLASS:class} - %{GREEDYDATA:message}" 
}
}
}

This comprehensive guide covers various aspects of Logstash parsing for Java logs, from basic configurations to advanced distributed tracing and security considerations. The key is to match your Logstash configuration with your Java application's logging patterns and requirements.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper