Distributed Logging with ELK Stack in Java

The ELK Stack (Elasticsearch, Logstash, Kibana) is a powerful solution for centralized logging in distributed systems. When combined with Java applications, it provides real-time log aggregation, search, analysis, and visualization capabilities.

ELK Stack Architecture Overview

Java Apps → Logstash → Elasticsearch → Kibana
↓
(Beats)

Setting Up the ELK Stack

Docker Compose Setup

# docker-compose.yml
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.7.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
networks:
- elk
logstash:
image: docker.elastic.co/logstash/logstash:8.7.0
ports:
- "5000:5000"
- "9600:9600"
environment:
- LS_JAVA_OPTS=-Xmx256m -Xms256m
volumes:
- ./logstash/config:/usr/share/logstash/pipeline
networks:
- elk
depends_on:
- elasticsearch
kibana:
image: docker.elastic.co/kibana/kibana:8.7.0
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
networks:
- elk
depends_on:
- elasticsearch
networks:
elk:
driver: bridge

Logstash Configuration

# logstash/config/logstash.conf
input {
# TCP input for Java applications
tcp {
port => 5000
codec => json_lines
}
# HTTP input
http {
port => 5001
codec => json
}
# File beat input
beats {
port => 5044
}
}
filter {
# Parse timestamp
date {
match => [ "timestamp", "ISO8601" ]
}
# Add application metadata
if [app_name] {
mutate {
add_field => { 
"[@metadata][app_name]" => "%{app_name}"
"[@metadata][environment]" => "%{environment}"
}
}
}
# Grok patterns for unstructured logs
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}" }
overwrite => [ "message" ]
}
# User agent parsing for web requests
if [user_agent] {
useragent {
source => "user_agent"
target => "user_agent_info"
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
document_id => "%{id}"
}
# For debugging
stdout {
codec => rubydebug
}
}

Java Application Configuration

Dependencies

<!-- Maven Dependencies -->
<dependencies>
<!-- Logback for logging -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.7</version>
</dependency>
<!-- Logstash Logback Encoder -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.3</version>
</dependency>
<!-- Micrometer for metrics -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>1.11.0</version>
</dependency>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>

Logback Configuration

<!-- src/main/resources/logback-spring.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<!-- JSON Layout for Logstash -->
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>localhost:5000</destination>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<version/>
<logLevel/>
<loggerName/>
<pattern>
<pattern>
{
"app": "user-service",
"environment": "${ENVIRONMENT:-dev}",
"instance": "${HOSTNAME:-local}"
}
</pattern>
</pattern>
<threadName/>
<message/>
<throwableClassName/>
<stackTrace/>
<stackHash/>
<mdc/>
<context/>
</providers>
</encoder>
<keepAliveDuration>5 minutes</keepAliveDuration>
</appender>
<!-- Console Appender -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- File Appender -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/application.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/application.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- Root Logger -->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
<appender-ref ref="LOGSTASH"/>
</root>
<!-- Application-specific loggers -->
<logger name="com.example.userservice" level="DEBUG"/>
<logger name="org.springframework.web" level="INFO"/>
<logger name="org.hibernate" level="WARN"/>
</configuration>

Java Application Implementation

Spring Boot Application with Structured Logging

// Application.java
@SpringBootApplication
public class UserServiceApplication {
private static final Logger logger = LoggerFactory.getLogger(UserServiceApplication.class);
public static void main(String[] args) {
SpringApplication.run(UserServiceApplication.class, args);
logger.info("User Service started successfully", 
kv("port", 8080),
kv("environment", System.getenv("ENVIRONMENT")));
}
}

Controller with Comprehensive Logging

@RestController
@RequestMapping("/api/users")
@Slf4j
public class UserController {
private final UserService userService;
private final MeterRegistry meterRegistry;
private final Counter userRequestsCounter;
public UserController(UserService userService, MeterRegistry meterRegistry) {
this.userService = userService;
this.meterRegistry = meterRegistry;
this.userRequestsCounter = Counter.builder("user.requests")
.description("Number of user API requests")
.register(meterRegistry);
}
@GetMapping("/{id}")
public ResponseEntity<User> getUser(@PathVariable Long id, 
@RequestHeader Map<String, String> headers) {
userRequestsCounter.increment();
MDC.put("userId", id.toString());
MDC.put("correlationId", headers.getOrDefault("X-Correlation-ID", "unknown"));
log.info("Fetching user by ID", 
kv("userId", id),
kv("userAgent", headers.get("User-Agent")));
try {
User user = userService.findById(id)
.orElseThrow(() -> new UserNotFoundException(id));
log.info("User found successfully",
kv("userId", id),
kv("userName", user.getName()));
return ResponseEntity.ok(user);
} catch (UserNotFoundException e) {
log.warn("User not found", 
kv("userId", id),
kv("error", e.getMessage()));
throw e;
} finally {
MDC.clear();
}
}
@PostMapping
public ResponseEntity<User> createUser(@RequestBody @Valid User user,
@RequestHeader Map<String, String> headers) {
userRequestsCounter.increment();
MDC.put("correlationId", headers.getOrDefault("X-Correlation-ID", "unknown"));
log.info("Creating new user",
kv("userEmail", user.getEmail()),
kv("operation", "create"));
try {
User createdUser = userService.createUser(user);
log.info("User created successfully",
kv("userId", createdUser.getId()),
kv("userEmail", createdUser.getEmail()));
return ResponseEntity.status(HttpStatus.CREATED).body(createdUser);
} catch (Exception e) {
log.error("Failed to create user",
kv("userEmail", user.getEmail()),
kv("error", e.getMessage()));
throw e;
} finally {
MDC.clear();
}
}
}

Service Layer with Business Logging

@Service
@Slf4j
public class UserService {
private final UserRepository userRepository;
private final Timer userProcessingTimer;
public UserService(UserRepository userRepository, MeterRegistry meterRegistry) {
this.userRepository = userRepository;
this.userProcessingTimer = Timer.builder("user.processing.time")
.description("Time taken to process user operations")
.register(meterRegistry);
}
public Optional<User> findById(Long id) {
return userProcessingTimer.record(() -> {
log.debug("Searching for user in database", kv("userId", id));
Optional<User> user = userRepository.findById(id);
if (user.isPresent()) {
log.debug("User found in database", 
kv("userId", id),
kv("userName", user.get().getName()));
} else {
log.debug("User not found in database", kv("userId", id));
}
return user;
});
}
public User createUser(User user) {
return userProcessingTimer.record(() -> {
log.info("Starting user creation process",
kv("userEmail", user.getEmail()));
// Validate email uniqueness
if (userRepository.findByEmail(user.getEmail()).isPresent()) {
log.warn("Duplicate email attempt",
kv("userEmail", user.getEmail()));
throw new DuplicateEmailException(user.getEmail());
}
User savedUser = userRepository.save(user);
log.info("User saved to database",
kv("userId", savedUser.getId()),
kv("userEmail", savedUser.getEmail()));
return savedUser;
});
}
@Async
public void processUserBatch(List<User> users) {
log.info("Starting batch user processing",
kv("batchSize", users.size()));
users.parallelStream().forEach(user -> {
try {
processSingleUser(user);
} catch (Exception e) {
log.error("Failed to process user in batch",
kv("userId", user.getId()),
kv("error", e.getMessage()));
}
});
log.info("Batch user processing completed",
kv("batchSize", users.size()));
}
private void processSingleUser(User user) {
// Simulate processing
log.debug("Processing user", kv("userId", user.getId()));
// Processing logic here
}
}

Custom Exceptions with Logging

public class UserNotFoundException extends RuntimeException {
private final Long userId;
public UserNotFoundException(Long userId) {
super("User not found with id: " + userId);
this.userId = userId;
}
public Long getUserId() {
return userId;
}
}
public class DuplicateEmailException extends RuntimeException {
private final String email;
public DuplicateEmailException(String email) {
super("Email already exists: " + email);
this.email = email;
}
public String getEmail() {
return email;
}
}

Global Exception Handler

@RestControllerAdvice
@Slf4j
public class GlobalExceptionHandler {
@ExceptionHandler(UserNotFoundException.class)
public ResponseEntity<ErrorResponse> handleUserNotFound(UserNotFoundException ex) {
log.warn("User not found in request",
kv("userId", ex.getUserId()),
kv("errorCode", "USER_NOT_FOUND"));
ErrorResponse error = new ErrorResponse(
"USER_NOT_FOUND",
ex.getMessage(),
Instant.now()
);
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(error);
}
@ExceptionHandler(DuplicateEmailException.class)
public ResponseEntity<ErrorResponse> handleDuplicateEmail(DuplicateEmailException ex) {
log.warn("Duplicate email registration attempt",
kv("userEmail", ex.getEmail()),
kv("errorCode", "DUPLICATE_EMAIL"));
ErrorResponse error = new ErrorResponse(
"DUPLICATE_EMAIL",
ex.getMessage(),
Instant.now()
);
return ResponseEntity.status(HttpStatus.CONFLICT).body(error);
}
@ExceptionHandler(Exception.class)
public ResponseEntity<ErrorResponse> handleGenericException(Exception ex) {
log.error("Unexpected error occurred",
kv("error", ex.getMessage()),
kv("errorType", ex.getClass().getSimpleName()));
ErrorResponse error = new ErrorResponse(
"INTERNAL_ERROR",
"An internal server error occurred",
Instant.now()
);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error);
}
}
// Error Response DTO
@Data
@AllArgsConstructor
class ErrorResponse {
private String code;
private String message;
private Instant timestamp;
}

Advanced Logging Features

Custom Logback Encoder for Enhanced JSON

<!-- Custom JSON encoder configuration -->
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
<fieldName>@timestamp</fieldName>
</timestamp>
<logLevel>
<fieldName>level</fieldName>
</logLevel>
<loggerName>
<fieldName>logger</fieldName>
</loggerName>
<pattern>
<pattern>
{
"service": "user-service",
"version": "1.0.0",
"environment": "${ENVIRONMENT:-development}",
"host": "${HOSTNAME:-localhost}",
"thread": "%thread"
}
</pattern>
</pattern>
<message>
<fieldName>message</fieldName>
</message>
<mdc/>
<stackTrace>
<fieldName>stack_trace</fieldName>
</stackTrace>
<context/>
<arguments>
<includeNonStructuredArguments>false</includeNonStructuredArguments>
</arguments>
</providers>
</encoder>

Logging Filter for Sensitive Data

@Component
@Slf4j
public class LoggingFilter implements Filter {
private final List<String> sensitiveFields = Arrays.asList(
"password", "creditCard", "ssn", "token"
);
@Override
public void doFilter(ServletRequest request, ServletResponse response, 
FilterChain chain) throws IOException, ServletException {
long startTime = System.currentTimeMillis();
String correlationId = UUID.randomUUID().toString();
MDC.put("correlationId", correlationId);
MDC.put("startTime", String.valueOf(startTime));
try {
log.info("HTTP Request started",
kv("method", ((HttpServletRequest) request).getMethod()),
kv("uri", ((HttpServletRequest) request).getRequestURI()),
kv("remoteAddr", request.getRemoteAddr()));
chain.doFilter(request, response);
} finally {
long duration = System.currentTimeMillis() - startTime;
HttpServletResponse httpResponse = (HttpServletResponse) response;
log.info("HTTP Request completed",
kv("method", ((HttpServletRequest) request).getMethod()),
kv("uri", ((HttpServletRequest) request).getRequestURI()),
kv("status", httpResponse.getStatus()),
kv("durationMs", duration));
MDC.clear();
}
}
private String maskSensitiveData(String json) {
for (String field : sensitiveFields) {
json = json.replaceAll(
String.format("\"%s\"\\s*:\\s*\"[^\"]*\"", field),
String.format("\"%s\":\"***\"", field)
);
}
return json;
}
}

Kibana Dashboard Configuration

Sample Kibana Discover Queries

// Find errors in the last hour
{
"query": {
"bool": {
"must": [
{
"match": {
"level": "ERROR"
}
},
{
"range": {
"@timestamp": {
"gte": "now-1h"
}
}
}
]
}
}
}
// Find slow requests
{
"query": {
"range": {
"durationMs": {
"gte": 1000
}
}
}
}
// Group by application and level
{
"size": 0,
"aggs": {
"apps": {
"terms": {
"field": "app.keyword"
},
"aggs": {
"levels": {
"terms": {
"field": "level.keyword"
}
}
}
}
}
}

Visualization JSON for Dashboard

{
"title": "Application Errors Over Time",
"type": "line",
"params": {
"type": "line",
"grid": {
"categoryLines": false,
"valueAxis": ""
},
"categoryAxes": [
{
"id": "CategoryAxis-1",
"type": "category",
"position": "bottom",
"show": true,
"style": {},
"scale": {
"type": "linear"
},
"labels": {
"show": true,
"filter": false,
"truncate": 100
}
}
],
"valueAxes": [
{
"id": "ValueAxis-1",
"name": "LeftAxis-1",
"type": "value",
"position": "left",
"show": true,
"style": {},
"scale": {
"type": "linear",
"mode": "normal"
},
"labels": {
"show": true,
"rotate": 0,
"filter": false,
"truncate": 100
}
}
],
"seriesParams": [
{
"show": true,
"type": "line",
"mode": "normal",
"data": {
"label": "Error Count",
"id": "1"
},
"valueAxis": "ValueAxis-1",
"drawLinesBetweenPoints": true,
"showCircles": true,
"interpolate": "linear"
}
]
},
"aggs": [
{
"id": "1",
"enabled": true,
"type": "count",
"schema": "metric",
"params": {}
},
{
"id": "2",
"enabled": true,
"type": "date_histogram",
"schema": "segment",
"params": {
"field": "@timestamp",
"useNormalizedEsInterval": true,
"interval": "auto",
"time_zone": "UTC",
"drop_partials": false,
"customInterval": "2h",
"min_doc_count": 1,
"extended_bounds": {}
}
},
{
"id": "3",
"enabled": true,
"type": "terms",
"schema": "group",
"params": {
"field": "level.keyword",
"size": 5,
"order": "desc",
"orderBy": "1"
}
}
]
}

Best Practices for Distributed Logging

1. Structured Logging

  • Use JSON format for all logs
  • Include correlation IDs for request tracing
  • Add contextual information (user ID, session ID, etc.)

2. Log Levels

  • ERROR: System failures and unexpected conditions
  • WARN: Recoverable issues and deprecations
  • INFO: Business processes and milestones
  • DEBUG: Detailed debugging information
  • TRACE: Very detailed tracing

3. Performance Considerations

  • Use asynchronous appenders to avoid blocking
  • Set appropriate log levels in production
  • Rotate log files regularly
  • Monitor log storage usage

4. Security

  • Never log sensitive information (passwords, tokens, PII)
  • Implement log masking for sensitive data
  • Secure log transmission with TLS
  • Control access to log data

Monitoring and Alerting

Elasticsearch Alerting Rules

{
"alert": {
"name": "High Error Rate",
"condition": {
"script": {
"source": "ctx.results[0].hits.total.value > 10",
"lang": "painless"
}
},
"actions": [
{
"type": "email",
"name": "Send Email",
"throttle_period": "15m",
"subject": "High Error Rate Alert",
"text": "Error count exceeded threshold: {{ctx.results[0].hits.total.value}}"
}
]
}
}

This comprehensive ELK Stack implementation provides a robust foundation for distributed logging in Java applications, enabling effective monitoring, debugging, and analysis across your microservices architecture.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper