Centralized Logging: Implementing Fluent Bit for Java Application Logs

Article

Fluent Bit is a lightweight, high-performance log processor and forwarder that enables centralized logging in distributed systems. For Java applications, Fluent Bit provides an efficient way to collect, process, and forward logs to various destinations like Elasticsearch, Kafka, S3, or other monitoring systems.

In this guide, we'll explore how to integrate Fluent Bit with Java applications to create robust, scalable logging pipelines.

Why Fluent Bit for Java Logging?

  • Lightweight: Minimal resource footprint compared to other log forwarders
  • High Performance: Built in C, optimized for high-throughput logging
  • Flexible Input/Output: Support for multiple log sources and destinations
  • Rich Processing: Powerful filtering and transformation capabilities
  • Kubernetes Native: Excellent support for containerized environments
  • Unified Logging: Consistent log processing across microservices

Part 1: Fluent Bit Architecture Overview

1.1 Fluent Bit Components

Java Application → Log Files/Fluentd Forwarder → Fluent Bit → Output Destinations
↓                    ↓                       ↓           ↓
Logback           TCP/HTTP Input           Filters      Elasticsearch
Log4j2            File Input               Parsers      Kafka
Custom Appenders  Systemd Input                        S3
CloudWatch
Splunk

1.2 Project Structure

java-fluentbit-app/
├── src/
│   ├── main/
│   │   ├── java/com/example/
│   │   └── resources/
│   │       ├── logback-spring.xml
│   │       └── application.yaml
│   └── test/
│       └── java/com/example/
├── docker/
│   ├── Dockerfile
│   └── fluent-bit/
│       ├── fluent-bit.conf
│       ├── parsers.conf
│       └── filters.conf
├── kubernetes/
│   ├── deployment.yaml
│   └── fluent-bit-config.yaml
└── docker-compose.yml

Part 2: Dependencies and Setup

2.1 Maven Dependencies

<!-- pom.xml -->
<properties>
<logback.version>1.4.11</logback.version>
<logstash.version>7.4</logstash.version>
<jackson.version>2.15.2</jackson.version>
<micrometer.version>1.11.5</micrometer.version>
</properties>
<dependencies>
<!-- Logback with Logstash Encoder -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>${logstash.version}</version>
</dependency>
<!-- Jackson for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Micrometer for metrics -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
</dependency>
<!-- Spring Boot (Optional) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>

Part 3: Logback Configuration for Fluent Bit

3.1 Structured JSON Logging

<!-- src/main/resources/logback-spring.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<!-- Custom JSON Layout for Fluent Bit -->
<appender name="JSON_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/application.json</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/application.%d{yyyy-MM-dd}.json</fileNamePattern>
<maxHistory>7</maxHistory>
<totalSizeCap>1GB</totalSizeCap>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<logLevel/>
<loggerName/>
<message/>
<mdc/>
<arguments/>
<stackTrace>
<throwableConverter class="net.logstash.logback.stacktrace.ShortenedThrowableConverter">
<maxDepthPerThrowable>30</maxDepthPerThrowable>
<maxLength>2048</maxLength>
<shortenedClassNameLength>20</shortenedClassNameLength>
<rootCauseFirst>true</rootCauseFirst>
</throwableConverter>
</stackTrace>
<pattern>
<pattern>
{
"service": "java-application",
"version": "1.0.0",
"environment": "${ENVIRONMENT:-development}"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>
<!-- Console appender for development -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- Fluentd/Fluent Bit TCP appender -->
<appender name="FLUENT_BIT" class="ch.qos.logback.classic.net.SocketAppender">
<remoteHost>localhost</remoteHost>
<port>24224</port>
<reconnectionDelay>10000</reconnectionDelay>
<includeCallerData>false</includeCallerData>
</appender>
<!-- HTTP appender for Fluent Bit -->
<appender name="FLUENT_BIT_HTTP" class="com.example.logging.FluentBitHttpAppender">
<url>http://localhost:9880/java.app</url>
<connectTimeout>5000</connectTimeout>
<readTimeout>10000</readTimeout>
</appender>
<!-- Root logger -->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="JSON_FILE"/>
<appender-ref ref="FLUENT_BIT_HTTP"/>
</root>
<!-- Application-specific loggers -->
<logger name="com.example" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="JSON_FILE"/>
<appender-ref ref="FLUENT_BIT_HTTP"/>
</logger>
<!-- Reduce noise from framework logs -->
<logger name="org.springframework" level="INFO"/>
<logger name="org.apache" level="WARN"/>
</configuration>

3.2 Custom Fluent Bit HTTP Appender

// File: src/main/java/com/example/logging/FluentBitHttpAppender.java
package com.example.logging;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import ch.qos.logback.core.Layout;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class FluentBitHttpAppender extends AppenderBase<ILoggingEvent> {
private static final Logger logger = LoggerFactory.getLogger(FluentBitHttpAppender.class);
private String url;
private int connectTimeout = 5000;
private int readTimeout = 10000;
private int batchSize = 100;
private int batchIntervalMs = 1000;
private final ObjectMapper objectMapper = new ObjectMapper();
private final HttpClient httpClient;
private final BlockingQueue<Map<String, Object>> logQueue;
private final ScheduledExecutorService scheduler;
public FluentBitHttpAppender() {
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(connectTimeout))
.build();
this.logQueue = new LinkedBlockingQueue<>(1000);
this.scheduler = Executors.newSingleThreadScheduledExecutor();
startBatchProcessor();
}
@Override
public void start() {
if (url == null || url.trim().isEmpty()) {
addError("URL must be set for FluentBitHttpAppender");
return;
}
super.start();
}
@Override
protected void append(ILoggingEvent event) {
if (!isStarted()) {
return;
}
try {
Map<String, Object> logEntry = createLogEntry(event);
// Non-blocking offer, if queue is full, log will be dropped
boolean offered = logQueue.offer(logEntry);
if (!offered) {
logger.warn("Log queue is full, dropping log entry: {}", event.getMessage());
}
} catch (Exception e) {
addError("Failed to process log event", e);
}
}
private Map<String, Object> createLogEntry(ILoggingEvent event) {
Map<String, Object> logEntry = new HashMap<>();
// Standard fields
logEntry.put("timestamp", event.getTimeStamp());
logEntry.put("level", event.getLevel().toString());
logEntry.put("logger", event.getLoggerName());
logEntry.put("message", event.getFormattedMessage());
logEntry.put("thread", event.getThreadName());
// MDC context
if (event.getMDCPropertyMap() != null && !event.getMDCPropertyMap().isEmpty()) {
logEntry.put("mdc", event.getMDCPropertyMap());
}
// Throwable information
if (event.getThrowableProxy() != null) {
Map<String, Object> exception = new HashMap<>();
exception.put("class", event.getThrowableProxy().getClassName());
exception.put("message", event.getThrowableProxy().getMessage());
exception.put("stack_trace", event.getThrowableProxy().getStackTraceElementProxyArray());
logEntry.put("exception", exception);
}
// Application metadata
logEntry.put("service", "java-application");
logEntry.put("environment", System.getenv().getOrDefault("ENVIRONMENT", "development"));
logEntry.put("hostname", getHostname());
return logEntry;
}
private void startBatchProcessor() {
scheduler.scheduleAtFixedRate(() -> {
if (logQueue.isEmpty()) {
return;
}
try {
flushLogs();
} catch (Exception e) {
logger.error("Error flushing logs to Fluent Bit", e);
}
}, batchIntervalMs, batchIntervalMs, TimeUnit.MILLISECONDS);
}
private void flushLogs() {
if (logQueue.isEmpty()) {
return;
}
Map<String, Object> batch = new HashMap<>();
batch.put("tag", "java.app");
// Collect up to batchSize logs
Map<String, Object>[] records = new Map[batchSize];
int count = 0;
while (count < batchSize && !logQueue.isEmpty()) {
Map<String, Object> logEntry = logQueue.poll();
if (logEntry != null) {
records[count] = logEntry;
count++;
}
}
if (count == 0) {
return;
}
// Prepare payload in Fluent Bit format
Map<String, Object> payload = new HashMap<>();
payload.put("tag", "java.app");
payload.put("time", System.currentTimeMillis() / 1000);
payload.put("record", java.util.Arrays.copyOf(records, count));
sendToFluentBit(payload);
}
private void sendToFluentBit(Map<String, Object> payload) {
try {
String jsonPayload = objectMapper.writeValueAsString(payload);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(jsonPayload))
.timeout(Duration.ofMillis(readTimeout))
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() >= 400) {
logger.error("Fluent Bit HTTP error: {} - {}", response.statusCode(), response.body());
}
} catch (JsonProcessingException e) {
logger.error("Failed to serialize log payload", e);
} catch (IOException e) {
logger.error("IO error sending logs to Fluent Bit", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Log sending interrupted", e);
} catch (Exception e) {
logger.error("Unexpected error sending logs to Fluent Bit", e);
}
}
private String getHostname() {
try {
return java.net.InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
return "unknown";
}
}
// Setters for configuration
public void setUrl(String url) { this.url = url; }
public void setConnectTimeout(int connectTimeout) { this.connectTimeout = connectTimeout; }
public void setReadTimeout(int readTimeout) { this.readTimeout = readTimeout; }
public void setBatchSize(int batchSize) { this.batchSize = batchSize; }
public void setBatchIntervalMs(int batchIntervalMs) { this.batchIntervalMs = batchIntervalMs; }
@Override
public void stop() {
super.stop();
scheduler.shutdown();
try {
// Flush remaining logs before shutdown
if (!logQueue.isEmpty()) {
flushLogs();
}
} catch (Exception e) {
// Ignore errors during shutdown
}
}
}

Part 4: Fluent Bit Configuration

4.1 Fluent Bit Configuration File

# docker/fluent-bit/fluent-bit.conf
[SERVICE]
# Daemon mode
Daemon    off
# Log level
Log_Level info
# Parsers
Parsers_File parsers.conf
# Plugins
Plugins_File plugins.conf
# HTTP Server for monitoring
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
# Input from Java application JSON files
[INPUT]
Name        tail
Path        /var/log/java-app/*.json
Parser      json
Tag         java.app
Refresh_Interval 5
Skip_Long_Lines On
Mem_Buf_Limit 50MB
# Input from HTTP (for direct Java app logging)
[INPUT]
Name        http
Listen      0.0.0.0
Port        9880
Tag         java.http
# Input from system logs
[INPUT]
Name        systemd
Tag         host.system
Systemd_Filter _SYSTEMD_UNIT=docker.service
Read_From_Tail On
# Parse JSON logs
[PARSER]
Name        json
Format      json
Time_Key    time
Time_Format %d/%b/%Y:%H:%M:%S %z
Time_Keep   On
# Filter for Java application logs
[FILTER]
Name        parser
Match       java.*
Key_Name    log
Parser      json
Reserve_Data On
# Add Kubernetes metadata (if running in K8s)
[FILTER]
Name          kubernetes
Match         java.*
K8S-Logging.Parser On
K8S-Logging.Exclude On
# Enrich logs with additional fields
[FILTER]
Name          record_modifier
Match         java.*
Record        hostname ${HOSTNAME}
Record        service_name java-application
Record        log_type application
# Sample and filter (only 10% of DEBUG logs)
[FILTER]
Name          grep
Match         java.*
Regex         level ^(INFO|WARN|ERROR)$
# Modify log level to uppercase
[FILTER]
Name          modify
Match         java.*
Rename        level log_level
# Output to Elasticsearch
[OUTPUT]
Name            es
Match           java.*
Host            elasticsearch
Port            9200
Index           java-app-logs
Type            _doc
Retry_Limit     5
HTTP_User       elastic
HTTP_Passwd     your_password
Suppress_Type_Name On
Generate_ID     On
# Output to Kafka
[OUTPUT]
Name            kafka
Match           java.*
Brokers         kafka1:9092,kafka2:9092,kafka3:9092
Topics          java-logs
Timestamp_Key   @timestamp
Retry_Limit     5
rdkafka.log.connection.close false
rdkafka.request.required.acks 1
# Output to Amazon S3
[OUTPUT]
Name            s3
Match           java.*
bucket          my-app-logs
region          us-east-1
total_file_size 100M
upload_timeout  1m
use_put_object  On
s3_key_format   /java-logs/%Y/%m/%d/%H/%M_%S_$UUID.gz
# Output to stdout for development
[OUTPUT]
Name            stdout
Match           *
Format          json_lines
# Output to file for backup
[OUTPUT]
Name            file
Match           java.*
Path            /var/log/fluent-bit/backup
Format          template
Template        { "time": "$time", "log": $log }

4.2 Custom Parsers Configuration

# docker/fluent-bit/parsers.conf
[PARSER]
Name        java_multiline
Format      regex
Regex       ^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) \[(?<thread>.*)\] (?<level>\w+) (?<logger>\S+) - (?<message>.*)$
Time_Key    time
Time_Format %Y-%m-%d %H:%M:%S.%L
[PARSER]
Name        java_stacktrace
Format      regex
Regex       ^\s+at\s+(?<class>\S+)\..*(?<file>\w+\.java):(?<line>\d+)\)
Skip_Empty_Lines On
[PARSER]
Name        json_clean
Format      json
Time_Key    timestamp
Time_Format %d/%b/%Y:%H:%M:%S %z

Part 5: Docker and Kubernetes Configuration

5.1 Docker Compose Setup

# docker-compose.yml
version: '3.8'
services:
java-app:
build:
context: .
dockerfile: docker/Dockerfile
ports:
- "8080:8080"
environment:
- ENVIRONMENT=development
- JAVA_OPTS=-Xmx512m -Xms256m
volumes:
- java-app-logs:/app/logs
- ./docker/fluent-bit:/fluent-bit/etc
depends_on:
- fluent-bit
networks:
- logging-network
fluent-bit:
image: fluent/fluent-bit:2.1.8
ports:
- "2020:2020"  # Monitoring
- "9880:9880"  # HTTP input
volumes:
- java-app-logs:/var/log/java-app
- ./docker/fluent-bit:/fluent-bit/etc
- fluent-bit-data:/var/log/fluent-bit
environment:
- FLUENT_BIT_CONFIG_FILE=/fluent-bit/etc/fluent-bit.conf
networks:
- logging-network
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.9.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
networks:
- logging-network
kibana:
image: docker.elastic.co/kibana/kibana:8.9.0
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- logging-network
volumes:
java-app-logs:
fluent-bit-data:
networks:
logging-network:
driver: bridge

5.2 Kubernetes Configuration

# kubernetes/fluent-bit-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-config
namespace: default
data:
fluent-bit.conf: |
[SERVICE]
Daemon    off
Log_Level info
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
[INPUT]
Name tail
Path /var/log/containers/*java-app*.log
Parser docker
Tag java.app
Mem_Buf_Limit 50MB
Skip_Long_Lines On
[FILTER]
Name kubernetes
Match java.*
Merge_Log On
Keep_Log Off
K8S-Logging.Parser On
K8S-Logging.Exclude On
[FILTER]
Name record_modifier
Match java.*
Record cluster_name production
Record app_type java
[OUTPUT]
Name es
Match java.*
Host elasticsearch-logging
Port 9200
Index java-app-logs
Retry_Limit 5
Generate_ID On
parsers.conf: |
[PARSER]
Name docker
Format json
Time_Key time
Time_Format %Y-%m-%dT%H:%M:%S.%LZ
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluent-bit
namespace: default
labels:
app: fluent-bit
spec:
selector:
matchLabels:
app: fluent-bit
template:
metadata:
labels:
app: fluent-bit
spec:
containers:
- name: fluent-bit
image: fluent/fluent-bit:2.1.8
volumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: fluent-bit-config
mountPath: /fluent-bit/etc/
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 256Mi
volumes:
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: fluent-bit-config
configMap:
name: fluent-bit-config
tolerations:
- key: node-role.kubernetes.io/master
effect: NoSchedule

Part 6: Java Application with Structured Logging

6.1 Logging Service

// File: src/main/java/com/example/service/StructuredLoggingService.java
package com.example.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@Service
public class StructuredLoggingService {
private static final Logger logger = LoggerFactory.getLogger(StructuredLoggingService.class);
public void processUserRequest(String userId, String action) {
// Set correlation ID for request tracing
String correlationId = UUID.randomUUID().toString();
MDC.put("correlationId", correlationId);
MDC.put("userId", userId);
MDC.put("action", action);
try {
logger.info("Processing user request", 
kv("user_id", userId),
kv("action", action),
kv("step", "start"));
// Business logic
validateUser(userId);
performAction(userId, action);
logger.info("User request completed successfully",
kv("user_id", userId),
kv("action", action),
kv("step", "end"),
kv("status", "success"));
} catch (Exception e) {
logger.error("User request failed",
kv("user_id", userId),
kv("action", action),
kv("step", "end"),
kv("status", "error"),
kv("error_message", e.getMessage()));
throw e;
} finally {
// Clear MDC
MDC.clear();
}
}
public void logBusinessMetric(String metricName, double value, Map<String, Object> dimensions) {
logger.info("Business metric recorded",
kv("metric_name", metricName),
kv("metric_value", value),
kv("metric_type", "business"),
kv("dimensions", dimensions));
}
public void logPerformanceMetrics(String operation, long durationMs, boolean success) {
logger.info("Performance metrics",
kv("operation", operation),
kv("duration_ms", durationMs),
kv("success", success),
kv("log_type", "performance"));
}
private void validateUser(String userId) {
logger.debug("Validating user", kv("user_id", userId));
// Validation logic
}
private void performAction(String userId, String action) {
logger.debug("Performing action", 
kv("user_id", userId),
kv("action", action));
// Action logic
}
// Helper method for structured logging
private static Object kv(String key, Object value) {
return new Object() {
@Override
public String toString() {
return key + "=" + value;
}
};
}
}

6.2 Spring Boot Configuration

// File: src/main/java/com/example/config/LoggingConfig.java
package com.example.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.filter.CommonsRequestLoggingFilter;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.UUID;
@Configuration
public class LoggingConfig implements WebMvcConfigurer {
@Bean
public CommonsRequestLoggingFilter requestLoggingFilter() {
CommonsRequestLoggingFilter loggingFilter = new CommonsRequestLoggingFilter();
loggingFilter.setIncludeClientInfo(true);
loggingFilter.setIncludeQueryString(true);
loggingFilter.setIncludePayload(true);
loggingFilter.setMaxPayloadLength(1000);
loggingFilter.setIncludeHeaders(true);
return loggingFilter;
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new CorrelationIdInterceptor());
}
static class CorrelationIdInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String correlationId = request.getHeader("X-Correlation-ID");
if (correlationId == null || correlationId.trim().isEmpty()) {
correlationId = UUID.randomUUID().toString();
}
// Set correlation ID in MDC for logging
org.slf4j.MDC.put("correlationId", correlationId);
org.slf4j.MDC.put("requestId", UUID.randomUUID().toString());
org.slf4j.MDC.put("requestPath", request.getRequestURI());
org.slf4j.MDC.put("httpMethod", request.getMethod());
org.slf4j.MDC.put("userAgent", request.getHeader("User-Agent"));
// Add correlation ID to response
response.setHeader("X-Correlation-ID", correlationId);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, 
Object handler, Exception ex) {
// Clear MDC after request completion
org.slf4j.MDC.clear();
}
}
}

6.3 Application Properties

# src/main/resources/application.yaml
spring:
application:
name: java-fluentbit-app
logging:
config: classpath:logback-spring.xml
level:
com.example: DEBUG
org.springframework.web: INFO
org.hibernate: WARN
file:
path: logs/
name: logs/application.json
management:
endpoints:
web:
exposure:
include: health,info,metrics,loggers
endpoint:
health:
show-details: always
loggers:
enabled: true
app:
logging:
fluent-bit:
enabled: true
url: http://localhost:9880/java.app
batch-size: 100
batch-interval-ms: 1000

Part 7: Testing Fluent Bit Integration

7.1 Logging Tests

// File: src/test/java/com/example/service/StructuredLoggingServiceTest.java
package com.example.service;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
class StructuredLoggingServiceTest {
@Autowired
private StructuredLoggingService loggingService;
private ListAppender<ILoggingEvent> listAppender;
@BeforeEach
void setUp() {
Logger logger = (Logger) LoggerFactory.getLogger(StructuredLoggingService.class);
listAppender = new ListAppender<>();
listAppender.start();
logger.addAppender(listAppender);
}
@Test
void shouldLogStructuredUserRequest() {
// When
loggingService.processUserRequest("user123", "login");
// Then
List<ILoggingEvent> logs = listAppender.list;
assertFalse(logs.isEmpty());
// Verify structured logging
ILoggingEvent logEvent = logs.get(0);
String logMessage = logEvent.getFormattedMessage();
assertTrue(logMessage.contains("user_id=user123"));
assertTrue(logMessage.contains("action=login"));
assertTrue(logMessage.contains("correlationId"));
}
@Test
void shouldLogBusinessMetrics() {
// When
loggingService.logBusinessMetric("user_registration", 1.0, 
Map.of("source", "web", "campaign", "spring2024"));
// Then
List<ILoggingEvent> logs = listAppender.list;
assertFalse(logs.isEmpty());
ILoggingEvent logEvent = logs.get(0);
assertTrue(logEvent.getFormattedMessage().contains("metric_name=user_registration"));
assertTrue(logEvent.getFormattedMessage().contains("metric_type=business"));
}
}

7.2 Integration Test with Fluent Bit

// File: src/test/java/com/example/integration/FluentBitIntegrationTest.java
package com.example.integration;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class FluentBitIntegrationTest {
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate restTemplate;
@Test
void shouldGenerateLogsWithCorrelationId() {
// Given
String url = "http://localhost:" + port + "/api/users";
HttpHeaders headers = new HttpHeaders();
headers.set("X-Correlation-ID", "test-correlation-123");
// When
ResponseEntity<String> response = restTemplate.exchange(
url, HttpMethod.GET, new HttpEntity<>(headers), String.class);
// Then
assertEquals(200, response.getStatusCodeValue());
assertNotNull(response.getHeaders().get("X-Correlation-ID"));
}
}

Best Practices for Fluent Bit with Java

  1. Structured Logging: Always use JSON format for machine-readable logs
  2. Correlation IDs: Include correlation IDs for request tracing
  3. Meaningful Fields: Use consistent field names across services
  4. Log Levels: Use appropriate log levels (DEBUG, INFO, WARN, ERROR)
  5. Sensitive Data: Never log passwords, tokens, or personal data
  6. Performance: Use asynchronous logging to avoid blocking application threads
  7. Monitoring: Set up alerts for error patterns and performance issues
  8. Retention: Configure log retention policies according to compliance requirements

Conclusion

Integrating Fluent Bit with Java applications provides a robust, scalable solution for centralized logging in distributed systems. By implementing structured logging with Fluent Bit, you can:

  • Collect logs efficiently from multiple Java applications
  • Process and enrich logs with additional context and metadata
  • Route logs to multiple destinations (Elasticsearch, Kafka, S3, etc.)
  • Maintain performance with minimal impact on application throughput
  • Enable comprehensive monitoring and troubleshooting capabilities

The patterns and configurations in this guide provide a solid foundation for building production-ready logging pipelines that scale with your Java applications, whether running in Docker, Kubernetes, or traditional environments.

Pyroscope Profiling in Java
Explains how to use Pyroscope for continuous profiling in Java applications, helping developers analyze CPU and memory usage patterns to improve performance and identify bottlenecks.
https://macronepal.com/blog/pyroscope-profiling-in-java/

OpenTelemetry Metrics in Java: Comprehensive Guide
Provides a complete guide to collecting and exporting metrics in Java using OpenTelemetry, including counters, histograms, gauges, and integration with monitoring tools. (MACRO NEPAL)
https://macronepal.com/blog/opentelemetry-metrics-in-java-comprehensive-guide/

OTLP Exporter in Java: Complete Guide for OpenTelemetry
Explains how to configure OTLP exporters in Java to send telemetry data such as traces, metrics, and logs to monitoring systems using HTTP or gRPC protocols. (MACRO NEPAL)
https://macronepal.com/blog/otlp-exporter-in-java-complete-guide-for-opentelemetry/

Thanos Integration in Java: Global View of Metrics
Explains how to integrate Thanos with Java monitoring systems to create a scalable global metrics view across multiple Prometheus instances.

https://macronepal.com/blog/thanos-integration-in-java-global-view-of-metrics

Time Series with InfluxDB in Java: Complete Guide (Version 2)
Explains how to manage time-series data using InfluxDB in Java applications, including storing, querying, and analyzing metrics data.

https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide-2

Time Series with InfluxDB in Java: Complete Guide
Provides an overview of integrating InfluxDB with Java for time-series data handling, including monitoring applications and managing performance metrics.

https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide

Implementing Prometheus Remote Write in Java (Version 2)
Explains how to configure Java applications to send metrics data to Prometheus-compatible systems using the remote write feature for scalable monitoring.

https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide-2

Implementing Prometheus Remote Write in Java: Complete Guide
Provides instructions for sending metrics from Java services to Prometheus servers, enabling centralized monitoring and real-time analytics.

https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide

Building a TileServer GL in Java: Vector and Raster Tile Server
Explains how to build a TileServer GL in Java for serving vector and raster map tiles, useful for geographic visualization and mapping applications.

https://macronepal.com/blog/building-a-tileserver-gl-in-java-vector-and-raster-tile-server

Indoor Mapping in Java
Explains how to create indoor mapping systems in Java, including navigation inside buildings, spatial data handling, and visualization techniques.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper