Article
Fluent Bit is a lightweight, high-performance log processor and forwarder that enables centralized logging in distributed systems. For Java applications, Fluent Bit provides an efficient way to collect, process, and forward logs to various destinations like Elasticsearch, Kafka, S3, or other monitoring systems.
In this guide, we'll explore how to integrate Fluent Bit with Java applications to create robust, scalable logging pipelines.
Why Fluent Bit for Java Logging?
- Lightweight: Minimal resource footprint compared to other log forwarders
- High Performance: Built in C, optimized for high-throughput logging
- Flexible Input/Output: Support for multiple log sources and destinations
- Rich Processing: Powerful filtering and transformation capabilities
- Kubernetes Native: Excellent support for containerized environments
- Unified Logging: Consistent log processing across microservices
Part 1: Fluent Bit Architecture Overview
1.1 Fluent Bit Components
Java Application → Log Files/Fluentd Forwarder → Fluent Bit → Output Destinations ↓ ↓ ↓ ↓ Logback TCP/HTTP Input Filters Elasticsearch Log4j2 File Input Parsers Kafka Custom Appenders Systemd Input S3 CloudWatch Splunk
1.2 Project Structure
java-fluentbit-app/ ├── src/ │ ├── main/ │ │ ├── java/com/example/ │ │ └── resources/ │ │ ├── logback-spring.xml │ │ └── application.yaml │ └── test/ │ └── java/com/example/ ├── docker/ │ ├── Dockerfile │ └── fluent-bit/ │ ├── fluent-bit.conf │ ├── parsers.conf │ └── filters.conf ├── kubernetes/ │ ├── deployment.yaml │ └── fluent-bit-config.yaml └── docker-compose.yml
Part 2: Dependencies and Setup
2.1 Maven Dependencies
<!-- pom.xml -->
<properties>
<logback.version>1.4.11</logback.version>
<logstash.version>7.4</logstash.version>
<jackson.version>2.15.2</jackson.version>
<micrometer.version>1.11.5</micrometer.version>
</properties>
<dependencies>
<!-- Logback with Logstash Encoder -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>${logstash.version}</version>
</dependency>
<!-- Jackson for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Micrometer for metrics -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
</dependency>
<!-- Spring Boot (Optional) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>
Part 3: Logback Configuration for Fluent Bit
3.1 Structured JSON Logging
<!-- src/main/resources/logback-spring.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<!-- Custom JSON Layout for Fluent Bit -->
<appender name="JSON_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/application.json</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/application.%d{yyyy-MM-dd}.json</fileNamePattern>
<maxHistory>7</maxHistory>
<totalSizeCap>1GB</totalSizeCap>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<logLevel/>
<loggerName/>
<message/>
<mdc/>
<arguments/>
<stackTrace>
<throwableConverter class="net.logstash.logback.stacktrace.ShortenedThrowableConverter">
<maxDepthPerThrowable>30</maxDepthPerThrowable>
<maxLength>2048</maxLength>
<shortenedClassNameLength>20</shortenedClassNameLength>
<rootCauseFirst>true</rootCauseFirst>
</throwableConverter>
</stackTrace>
<pattern>
<pattern>
{
"service": "java-application",
"version": "1.0.0",
"environment": "${ENVIRONMENT:-development}"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>
<!-- Console appender for development -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- Fluentd/Fluent Bit TCP appender -->
<appender name="FLUENT_BIT" class="ch.qos.logback.classic.net.SocketAppender">
<remoteHost>localhost</remoteHost>
<port>24224</port>
<reconnectionDelay>10000</reconnectionDelay>
<includeCallerData>false</includeCallerData>
</appender>
<!-- HTTP appender for Fluent Bit -->
<appender name="FLUENT_BIT_HTTP" class="com.example.logging.FluentBitHttpAppender">
<url>http://localhost:9880/java.app</url>
<connectTimeout>5000</connectTimeout>
<readTimeout>10000</readTimeout>
</appender>
<!-- Root logger -->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="JSON_FILE"/>
<appender-ref ref="FLUENT_BIT_HTTP"/>
</root>
<!-- Application-specific loggers -->
<logger name="com.example" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="JSON_FILE"/>
<appender-ref ref="FLUENT_BIT_HTTP"/>
</logger>
<!-- Reduce noise from framework logs -->
<logger name="org.springframework" level="INFO"/>
<logger name="org.apache" level="WARN"/>
</configuration>
3.2 Custom Fluent Bit HTTP Appender
// File: src/main/java/com/example/logging/FluentBitHttpAppender.java
package com.example.logging;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import ch.qos.logback.core.Layout;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class FluentBitHttpAppender extends AppenderBase<ILoggingEvent> {
private static final Logger logger = LoggerFactory.getLogger(FluentBitHttpAppender.class);
private String url;
private int connectTimeout = 5000;
private int readTimeout = 10000;
private int batchSize = 100;
private int batchIntervalMs = 1000;
private final ObjectMapper objectMapper = new ObjectMapper();
private final HttpClient httpClient;
private final BlockingQueue<Map<String, Object>> logQueue;
private final ScheduledExecutorService scheduler;
public FluentBitHttpAppender() {
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(connectTimeout))
.build();
this.logQueue = new LinkedBlockingQueue<>(1000);
this.scheduler = Executors.newSingleThreadScheduledExecutor();
startBatchProcessor();
}
@Override
public void start() {
if (url == null || url.trim().isEmpty()) {
addError("URL must be set for FluentBitHttpAppender");
return;
}
super.start();
}
@Override
protected void append(ILoggingEvent event) {
if (!isStarted()) {
return;
}
try {
Map<String, Object> logEntry = createLogEntry(event);
// Non-blocking offer, if queue is full, log will be dropped
boolean offered = logQueue.offer(logEntry);
if (!offered) {
logger.warn("Log queue is full, dropping log entry: {}", event.getMessage());
}
} catch (Exception e) {
addError("Failed to process log event", e);
}
}
private Map<String, Object> createLogEntry(ILoggingEvent event) {
Map<String, Object> logEntry = new HashMap<>();
// Standard fields
logEntry.put("timestamp", event.getTimeStamp());
logEntry.put("level", event.getLevel().toString());
logEntry.put("logger", event.getLoggerName());
logEntry.put("message", event.getFormattedMessage());
logEntry.put("thread", event.getThreadName());
// MDC context
if (event.getMDCPropertyMap() != null && !event.getMDCPropertyMap().isEmpty()) {
logEntry.put("mdc", event.getMDCPropertyMap());
}
// Throwable information
if (event.getThrowableProxy() != null) {
Map<String, Object> exception = new HashMap<>();
exception.put("class", event.getThrowableProxy().getClassName());
exception.put("message", event.getThrowableProxy().getMessage());
exception.put("stack_trace", event.getThrowableProxy().getStackTraceElementProxyArray());
logEntry.put("exception", exception);
}
// Application metadata
logEntry.put("service", "java-application");
logEntry.put("environment", System.getenv().getOrDefault("ENVIRONMENT", "development"));
logEntry.put("hostname", getHostname());
return logEntry;
}
private void startBatchProcessor() {
scheduler.scheduleAtFixedRate(() -> {
if (logQueue.isEmpty()) {
return;
}
try {
flushLogs();
} catch (Exception e) {
logger.error("Error flushing logs to Fluent Bit", e);
}
}, batchIntervalMs, batchIntervalMs, TimeUnit.MILLISECONDS);
}
private void flushLogs() {
if (logQueue.isEmpty()) {
return;
}
Map<String, Object> batch = new HashMap<>();
batch.put("tag", "java.app");
// Collect up to batchSize logs
Map<String, Object>[] records = new Map[batchSize];
int count = 0;
while (count < batchSize && !logQueue.isEmpty()) {
Map<String, Object> logEntry = logQueue.poll();
if (logEntry != null) {
records[count] = logEntry;
count++;
}
}
if (count == 0) {
return;
}
// Prepare payload in Fluent Bit format
Map<String, Object> payload = new HashMap<>();
payload.put("tag", "java.app");
payload.put("time", System.currentTimeMillis() / 1000);
payload.put("record", java.util.Arrays.copyOf(records, count));
sendToFluentBit(payload);
}
private void sendToFluentBit(Map<String, Object> payload) {
try {
String jsonPayload = objectMapper.writeValueAsString(payload);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(jsonPayload))
.timeout(Duration.ofMillis(readTimeout))
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() >= 400) {
logger.error("Fluent Bit HTTP error: {} - {}", response.statusCode(), response.body());
}
} catch (JsonProcessingException e) {
logger.error("Failed to serialize log payload", e);
} catch (IOException e) {
logger.error("IO error sending logs to Fluent Bit", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Log sending interrupted", e);
} catch (Exception e) {
logger.error("Unexpected error sending logs to Fluent Bit", e);
}
}
private String getHostname() {
try {
return java.net.InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
return "unknown";
}
}
// Setters for configuration
public void setUrl(String url) { this.url = url; }
public void setConnectTimeout(int connectTimeout) { this.connectTimeout = connectTimeout; }
public void setReadTimeout(int readTimeout) { this.readTimeout = readTimeout; }
public void setBatchSize(int batchSize) { this.batchSize = batchSize; }
public void setBatchIntervalMs(int batchIntervalMs) { this.batchIntervalMs = batchIntervalMs; }
@Override
public void stop() {
super.stop();
scheduler.shutdown();
try {
// Flush remaining logs before shutdown
if (!logQueue.isEmpty()) {
flushLogs();
}
} catch (Exception e) {
// Ignore errors during shutdown
}
}
}
Part 4: Fluent Bit Configuration
4.1 Fluent Bit Configuration File
# docker/fluent-bit/fluent-bit.conf
[SERVICE]
# Daemon mode
Daemon off
# Log level
Log_Level info
# Parsers
Parsers_File parsers.conf
# Plugins
Plugins_File plugins.conf
# HTTP Server for monitoring
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
# Input from Java application JSON files
[INPUT]
Name tail
Path /var/log/java-app/*.json
Parser json
Tag java.app
Refresh_Interval 5
Skip_Long_Lines On
Mem_Buf_Limit 50MB
# Input from HTTP (for direct Java app logging)
[INPUT]
Name http
Listen 0.0.0.0
Port 9880
Tag java.http
# Input from system logs
[INPUT]
Name systemd
Tag host.system
Systemd_Filter _SYSTEMD_UNIT=docker.service
Read_From_Tail On
# Parse JSON logs
[PARSER]
Name json
Format json
Time_Key time
Time_Format %d/%b/%Y:%H:%M:%S %z
Time_Keep On
# Filter for Java application logs
[FILTER]
Name parser
Match java.*
Key_Name log
Parser json
Reserve_Data On
# Add Kubernetes metadata (if running in K8s)
[FILTER]
Name kubernetes
Match java.*
K8S-Logging.Parser On
K8S-Logging.Exclude On
# Enrich logs with additional fields
[FILTER]
Name record_modifier
Match java.*
Record hostname ${HOSTNAME}
Record service_name java-application
Record log_type application
# Sample and filter (only 10% of DEBUG logs)
[FILTER]
Name grep
Match java.*
Regex level ^(INFO|WARN|ERROR)$
# Modify log level to uppercase
[FILTER]
Name modify
Match java.*
Rename level log_level
# Output to Elasticsearch
[OUTPUT]
Name es
Match java.*
Host elasticsearch
Port 9200
Index java-app-logs
Type _doc
Retry_Limit 5
HTTP_User elastic
HTTP_Passwd your_password
Suppress_Type_Name On
Generate_ID On
# Output to Kafka
[OUTPUT]
Name kafka
Match java.*
Brokers kafka1:9092,kafka2:9092,kafka3:9092
Topics java-logs
Timestamp_Key @timestamp
Retry_Limit 5
rdkafka.log.connection.close false
rdkafka.request.required.acks 1
# Output to Amazon S3
[OUTPUT]
Name s3
Match java.*
bucket my-app-logs
region us-east-1
total_file_size 100M
upload_timeout 1m
use_put_object On
s3_key_format /java-logs/%Y/%m/%d/%H/%M_%S_$UUID.gz
# Output to stdout for development
[OUTPUT]
Name stdout
Match *
Format json_lines
# Output to file for backup
[OUTPUT]
Name file
Match java.*
Path /var/log/fluent-bit/backup
Format template
Template { "time": "$time", "log": $log }
4.2 Custom Parsers Configuration
# docker/fluent-bit/parsers.conf
[PARSER]
Name java_multiline
Format regex
Regex ^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) \[(?<thread>.*)\] (?<level>\w+) (?<logger>\S+) - (?<message>.*)$
Time_Key time
Time_Format %Y-%m-%d %H:%M:%S.%L
[PARSER]
Name java_stacktrace
Format regex
Regex ^\s+at\s+(?<class>\S+)\..*(?<file>\w+\.java):(?<line>\d+)\)
Skip_Empty_Lines On
[PARSER]
Name json_clean
Format json
Time_Key timestamp
Time_Format %d/%b/%Y:%H:%M:%S %z
Part 5: Docker and Kubernetes Configuration
5.1 Docker Compose Setup
# docker-compose.yml version: '3.8' services: java-app: build: context: . dockerfile: docker/Dockerfile ports: - "8080:8080" environment: - ENVIRONMENT=development - JAVA_OPTS=-Xmx512m -Xms256m volumes: - java-app-logs:/app/logs - ./docker/fluent-bit:/fluent-bit/etc depends_on: - fluent-bit networks: - logging-network fluent-bit: image: fluent/fluent-bit:2.1.8 ports: - "2020:2020" # Monitoring - "9880:9880" # HTTP input volumes: - java-app-logs:/var/log/java-app - ./docker/fluent-bit:/fluent-bit/etc - fluent-bit-data:/var/log/fluent-bit environment: - FLUENT_BIT_CONFIG_FILE=/fluent-bit/etc/fluent-bit.conf networks: - logging-network elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:8.9.0 environment: - discovery.type=single-node - xpack.security.enabled=false ports: - "9200:9200" networks: - logging-network kibana: image: docker.elastic.co/kibana/kibana:8.9.0 ports: - "5601:5601" environment: - ELASTICSEARCH_HOSTS=http://elasticsearch:9200 depends_on: - elasticsearch networks: - logging-network volumes: java-app-logs: fluent-bit-data: networks: logging-network: driver: bridge
5.2 Kubernetes Configuration
# kubernetes/fluent-bit-config.yaml apiVersion: v1 kind: ConfigMap metadata: name: fluent-bit-config namespace: default data: fluent-bit.conf: | [SERVICE] Daemon off Log_Level info HTTP_Server On HTTP_Listen 0.0.0.0 HTTP_Port 2020 [INPUT] Name tail Path /var/log/containers/*java-app*.log Parser docker Tag java.app Mem_Buf_Limit 50MB Skip_Long_Lines On [FILTER] Name kubernetes Match java.* Merge_Log On Keep_Log Off K8S-Logging.Parser On K8S-Logging.Exclude On [FILTER] Name record_modifier Match java.* Record cluster_name production Record app_type java [OUTPUT] Name es Match java.* Host elasticsearch-logging Port 9200 Index java-app-logs Retry_Limit 5 Generate_ID On parsers.conf: | [PARSER] Name docker Format json Time_Key time Time_Format %Y-%m-%dT%H:%M:%S.%LZ --- apiVersion: apps/v1 kind: DaemonSet metadata: name: fluent-bit namespace: default labels: app: fluent-bit spec: selector: matchLabels: app: fluent-bit template: metadata: labels: app: fluent-bit spec: containers: - name: fluent-bit image: fluent/fluent-bit:2.1.8 volumeMounts: - name: varlog mountPath: /var/log - name: varlibdockercontainers mountPath: /var/lib/docker/containers readOnly: true - name: fluent-bit-config mountPath: /fluent-bit/etc/ resources: requests: cpu: 100m memory: 128Mi limits: cpu: 500m memory: 256Mi volumes: - name: varlog hostPath: path: /var/log - name: varlibdockercontainers hostPath: path: /var/lib/docker/containers - name: fluent-bit-config configMap: name: fluent-bit-config tolerations: - key: node-role.kubernetes.io/master effect: NoSchedule
Part 6: Java Application with Structured Logging
6.1 Logging Service
// File: src/main/java/com/example/service/StructuredLoggingService.java
package com.example.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@Service
public class StructuredLoggingService {
private static final Logger logger = LoggerFactory.getLogger(StructuredLoggingService.class);
public void processUserRequest(String userId, String action) {
// Set correlation ID for request tracing
String correlationId = UUID.randomUUID().toString();
MDC.put("correlationId", correlationId);
MDC.put("userId", userId);
MDC.put("action", action);
try {
logger.info("Processing user request",
kv("user_id", userId),
kv("action", action),
kv("step", "start"));
// Business logic
validateUser(userId);
performAction(userId, action);
logger.info("User request completed successfully",
kv("user_id", userId),
kv("action", action),
kv("step", "end"),
kv("status", "success"));
} catch (Exception e) {
logger.error("User request failed",
kv("user_id", userId),
kv("action", action),
kv("step", "end"),
kv("status", "error"),
kv("error_message", e.getMessage()));
throw e;
} finally {
// Clear MDC
MDC.clear();
}
}
public void logBusinessMetric(String metricName, double value, Map<String, Object> dimensions) {
logger.info("Business metric recorded",
kv("metric_name", metricName),
kv("metric_value", value),
kv("metric_type", "business"),
kv("dimensions", dimensions));
}
public void logPerformanceMetrics(String operation, long durationMs, boolean success) {
logger.info("Performance metrics",
kv("operation", operation),
kv("duration_ms", durationMs),
kv("success", success),
kv("log_type", "performance"));
}
private void validateUser(String userId) {
logger.debug("Validating user", kv("user_id", userId));
// Validation logic
}
private void performAction(String userId, String action) {
logger.debug("Performing action",
kv("user_id", userId),
kv("action", action));
// Action logic
}
// Helper method for structured logging
private static Object kv(String key, Object value) {
return new Object() {
@Override
public String toString() {
return key + "=" + value;
}
};
}
}
6.2 Spring Boot Configuration
// File: src/main/java/com/example/config/LoggingConfig.java
package com.example.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.filter.CommonsRequestLoggingFilter;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.UUID;
@Configuration
public class LoggingConfig implements WebMvcConfigurer {
@Bean
public CommonsRequestLoggingFilter requestLoggingFilter() {
CommonsRequestLoggingFilter loggingFilter = new CommonsRequestLoggingFilter();
loggingFilter.setIncludeClientInfo(true);
loggingFilter.setIncludeQueryString(true);
loggingFilter.setIncludePayload(true);
loggingFilter.setMaxPayloadLength(1000);
loggingFilter.setIncludeHeaders(true);
return loggingFilter;
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new CorrelationIdInterceptor());
}
static class CorrelationIdInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String correlationId = request.getHeader("X-Correlation-ID");
if (correlationId == null || correlationId.trim().isEmpty()) {
correlationId = UUID.randomUUID().toString();
}
// Set correlation ID in MDC for logging
org.slf4j.MDC.put("correlationId", correlationId);
org.slf4j.MDC.put("requestId", UUID.randomUUID().toString());
org.slf4j.MDC.put("requestPath", request.getRequestURI());
org.slf4j.MDC.put("httpMethod", request.getMethod());
org.slf4j.MDC.put("userAgent", request.getHeader("User-Agent"));
// Add correlation ID to response
response.setHeader("X-Correlation-ID", correlationId);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception ex) {
// Clear MDC after request completion
org.slf4j.MDC.clear();
}
}
}
6.3 Application Properties
# src/main/resources/application.yaml spring: application: name: java-fluentbit-app logging: config: classpath:logback-spring.xml level: com.example: DEBUG org.springframework.web: INFO org.hibernate: WARN file: path: logs/ name: logs/application.json management: endpoints: web: exposure: include: health,info,metrics,loggers endpoint: health: show-details: always loggers: enabled: true app: logging: fluent-bit: enabled: true url: http://localhost:9880/java.app batch-size: 100 batch-interval-ms: 1000
Part 7: Testing Fluent Bit Integration
7.1 Logging Tests
// File: src/test/java/com/example/service/StructuredLoggingServiceTest.java
package com.example.service;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
class StructuredLoggingServiceTest {
@Autowired
private StructuredLoggingService loggingService;
private ListAppender<ILoggingEvent> listAppender;
@BeforeEach
void setUp() {
Logger logger = (Logger) LoggerFactory.getLogger(StructuredLoggingService.class);
listAppender = new ListAppender<>();
listAppender.start();
logger.addAppender(listAppender);
}
@Test
void shouldLogStructuredUserRequest() {
// When
loggingService.processUserRequest("user123", "login");
// Then
List<ILoggingEvent> logs = listAppender.list;
assertFalse(logs.isEmpty());
// Verify structured logging
ILoggingEvent logEvent = logs.get(0);
String logMessage = logEvent.getFormattedMessage();
assertTrue(logMessage.contains("user_id=user123"));
assertTrue(logMessage.contains("action=login"));
assertTrue(logMessage.contains("correlationId"));
}
@Test
void shouldLogBusinessMetrics() {
// When
loggingService.logBusinessMetric("user_registration", 1.0,
Map.of("source", "web", "campaign", "spring2024"));
// Then
List<ILoggingEvent> logs = listAppender.list;
assertFalse(logs.isEmpty());
ILoggingEvent logEvent = logs.get(0);
assertTrue(logEvent.getFormattedMessage().contains("metric_name=user_registration"));
assertTrue(logEvent.getFormattedMessage().contains("metric_type=business"));
}
}
7.2 Integration Test with Fluent Bit
// File: src/test/java/com/example/integration/FluentBitIntegrationTest.java
package com.example.integration;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class FluentBitIntegrationTest {
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate restTemplate;
@Test
void shouldGenerateLogsWithCorrelationId() {
// Given
String url = "http://localhost:" + port + "/api/users";
HttpHeaders headers = new HttpHeaders();
headers.set("X-Correlation-ID", "test-correlation-123");
// When
ResponseEntity<String> response = restTemplate.exchange(
url, HttpMethod.GET, new HttpEntity<>(headers), String.class);
// Then
assertEquals(200, response.getStatusCodeValue());
assertNotNull(response.getHeaders().get("X-Correlation-ID"));
}
}
Best Practices for Fluent Bit with Java
- Structured Logging: Always use JSON format for machine-readable logs
- Correlation IDs: Include correlation IDs for request tracing
- Meaningful Fields: Use consistent field names across services
- Log Levels: Use appropriate log levels (DEBUG, INFO, WARN, ERROR)
- Sensitive Data: Never log passwords, tokens, or personal data
- Performance: Use asynchronous logging to avoid blocking application threads
- Monitoring: Set up alerts for error patterns and performance issues
- Retention: Configure log retention policies according to compliance requirements
Conclusion
Integrating Fluent Bit with Java applications provides a robust, scalable solution for centralized logging in distributed systems. By implementing structured logging with Fluent Bit, you can:
- Collect logs efficiently from multiple Java applications
- Process and enrich logs with additional context and metadata
- Route logs to multiple destinations (Elasticsearch, Kafka, S3, etc.)
- Maintain performance with minimal impact on application throughput
- Enable comprehensive monitoring and troubleshooting capabilities
The patterns and configurations in this guide provide a solid foundation for building production-ready logging pipelines that scale with your Java applications, whether running in Docker, Kubernetes, or traditional environments.
Pyroscope Profiling in Java
Explains how to use Pyroscope for continuous profiling in Java applications, helping developers analyze CPU and memory usage patterns to improve performance and identify bottlenecks.
https://macronepal.com/blog/pyroscope-profiling-in-java/
OpenTelemetry Metrics in Java: Comprehensive Guide
Provides a complete guide to collecting and exporting metrics in Java using OpenTelemetry, including counters, histograms, gauges, and integration with monitoring tools. (MACRO NEPAL)
https://macronepal.com/blog/opentelemetry-metrics-in-java-comprehensive-guide/
OTLP Exporter in Java: Complete Guide for OpenTelemetry
Explains how to configure OTLP exporters in Java to send telemetry data such as traces, metrics, and logs to monitoring systems using HTTP or gRPC protocols. (MACRO NEPAL)
https://macronepal.com/blog/otlp-exporter-in-java-complete-guide-for-opentelemetry/
Thanos Integration in Java: Global View of Metrics
Explains how to integrate Thanos with Java monitoring systems to create a scalable global metrics view across multiple Prometheus instances.
https://macronepal.com/blog/thanos-integration-in-java-global-view-of-metrics
Time Series with InfluxDB in Java: Complete Guide (Version 2)
Explains how to manage time-series data using InfluxDB in Java applications, including storing, querying, and analyzing metrics data.
https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide-2
Time Series with InfluxDB in Java: Complete Guide
Provides an overview of integrating InfluxDB with Java for time-series data handling, including monitoring applications and managing performance metrics.
https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide
Implementing Prometheus Remote Write in Java (Version 2)
Explains how to configure Java applications to send metrics data to Prometheus-compatible systems using the remote write feature for scalable monitoring.
https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide-2
Implementing Prometheus Remote Write in Java: Complete Guide
Provides instructions for sending metrics from Java services to Prometheus servers, enabling centralized monitoring and real-time analytics.
https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide
Building a TileServer GL in Java: Vector and Raster Tile Server
Explains how to build a TileServer GL in Java for serving vector and raster map tiles, useful for geographic visualization and mapping applications.
https://macronepal.com/blog/building-a-tileserver-gl-in-java-vector-and-raster-tile-server
Indoor Mapping in Java
Explains how to create indoor mapping systems in Java, including navigation inside buildings, spatial data handling, and visualization techniques.