Loki Log Aggregation in Java: Complete Implementation Guide

Loki is a horizontally-scalable, highly-available, multi-tenant log aggregation system inspired by Prometheus. In this article, we'll build a complete Java-based log aggregation system that integrates with Loki, including client libraries, log collectors, and management tools.

Project Setup

First, add the necessary dependencies to your pom.xml:

<dependencies>
<!-- HTTP Client for Loki API -->
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.2.1</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Logging Framework -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<!-- Metrics (Optional) -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>1.11.3</version>
</dependency>
<!-- Compression -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.23.0</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.13.0</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.0</version>
<scope>test</scope>
</dependency>
</dependencies>

Core Implementation

1. Configuration Models

LokiConfig.java - Main configuration class

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@JsonIgnoreProperties(ignoreUnknown = true)
public class LokiConfig {
private final String url;
private final String tenantId;
private final Duration timeout;
private final int maxBatchSize;
private final Duration batchTimeout;
private final int maxRetries;
private final Duration retryDelay;
private final boolean compressionEnabled;
private final Map<String, String> defaultLabels;
private final AuthConfig auth;
private LokiConfig(Builder builder) {
this.url = builder.url;
this.tenantId = builder.tenantId;
this.timeout = builder.timeout;
this.maxBatchSize = builder.maxBatchSize;
this.batchTimeout = builder.batchTimeout;
this.maxRetries = builder.maxRetries;
this.retryDelay = builder.retryDelay;
this.compressionEnabled = builder.compressionEnabled;
this.defaultLabels = builder.defaultLabels;
this.auth = builder.auth;
}
public static class Builder {
private String url = "http://localhost:3100";
private String tenantId;
private Duration timeout = Duration.ofSeconds(30);
private int maxBatchSize = 1000;
private Duration batchTimeout = Duration.ofSeconds(5);
private int maxRetries = 3;
private Duration retryDelay = Duration.ofSeconds(1);
private boolean compressionEnabled = true;
private Map<String, String> defaultLabels = new HashMap<>();
private AuthConfig auth;
public Builder url(String url) {
this.url = url;
return this;
}
public Builder tenantId(String tenantId) {
this.tenantId = tenantId;
return this;
}
public Builder timeout(Duration timeout) {
this.timeout = timeout;
return this;
}
public Builder maxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
return this;
}
public Builder batchTimeout(Duration batchTimeout) {
this.batchTimeout = batchTimeout;
return this;
}
public Builder maxRetries(int maxRetries) {
this.maxRetries = maxRetries;
return this;
}
public Builder retryDelay(Duration retryDelay) {
this.retryDelay = retryDelay;
return this;
}
public Builder compressionEnabled(boolean compressionEnabled) {
this.compressionEnabled = compressionEnabled;
return this;
}
public Builder defaultLabel(String key, String value) {
this.defaultLabels.put(key, value);
return this;
}
public Builder defaultLabels(Map<String, String> labels) {
this.defaultLabels.putAll(labels);
return this;
}
public Builder auth(AuthConfig auth) {
this.auth = auth;
return this;
}
public LokiConfig build() {
// Set default labels if none provided
if (defaultLabels.isEmpty()) {
defaultLabels.put("job", "java-application");
defaultLabels.put("language", "java");
}
return new LokiConfig(this);
}
}
// Getters
public String getUrl() { return url; }
public String getTenantId() { return tenantId; }
public Duration getTimeout() { return timeout; }
public int getMaxBatchSize() { return maxBatchSize; }
public Duration getBatchTimeout() { return batchTimeout; }
public int getMaxRetries() { return maxRetries; }
public Duration getRetryDelay() { return retryDelay; }
public boolean isCompressionEnabled() { return compressionEnabled; }
public Map<String, String> getDefaultLabels() { return defaultLabels; }
public AuthConfig getAuth() { return auth; }
}

AuthConfig.java - Authentication configuration

public class AuthConfig {
private final AuthType type;
private final String username;
private final String password;
private final String token;
private final String tokenFile;
public enum AuthType {
NONE, BASIC, BEARER, TOKEN_FILE
}
private AuthConfig(Builder builder) {
this.type = builder.type;
this.username = builder.username;
this.password = builder.password;
this.token = builder.token;
this.tokenFile = builder.tokenFile;
}
public static class Builder {
private AuthType type = AuthType.NONE;
private String username;
private String password;
private String token;
private String tokenFile;
public Builder basicAuth(String username, String password) {
this.type = AuthType.BASIC;
this.username = username;
this.password = password;
return this;
}
public Builder bearerToken(String token) {
this.type = AuthType.BEARER;
this.token = token;
return this;
}
public Builder tokenFile(String tokenFile) {
this.type = AuthType.TOKEN_FILE;
this.tokenFile = tokenFile;
return this;
}
public AuthConfig build() {
return new AuthConfig(this);
}
}
// Getters
public AuthType getType() { return type; }
public String getUsername() { return username; }
public String getPassword() { return password; }
public String getToken() { return token; }
public String getTokenFile() { return tokenFile; }
}

2. Data Models

LogEntry.java - Represents a single log entry

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public class LogEntry {
private final Instant timestamp;
private final String line;
private final Map<String, String> labels;
public LogEntry(String line) {
this(Instant.now(), line, new HashMap<>());
}
public LogEntry(Instant timestamp, String line) {
this(timestamp, line, new HashMap<>());
}
public LogEntry(Instant timestamp, String line, Map<String, String> labels) {
this.timestamp = Objects.requireNonNull(timestamp);
this.line = Objects.requireNonNull(line);
this.labels = new HashMap<>(labels);
}
// Getters
public Instant getTimestamp() { return timestamp; }
public String getLine() { return line; }
public Map<String, String> getLabels() { return labels; }
public LogEntry withLabel(String key, String value) {
this.labels.put(key, value);
return this;
}
public LogEntry withLabels(Map<String, String> additionalLabels) {
this.labels.putAll(additionalLabels);
return this;
}
public long getTimestampNs() {
return timestamp.getEpochSecond() * 1_000_000_000L + timestamp.getNano();
}
@Override
public String toString() {
return String.format("LogEntry{timestamp=%s, line='%s', labels=%s}", 
timestamp, line, labels);
}
}

LogStream.java - Represents a Loki log stream (set of entries with same labels)

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class LogStream {
private final Map<String, String> stream;
private final List<Object[]> values; // [timestamp_ns, log_line]
public LogStream(Map<String, String> stream) {
this.stream = Objects.requireNonNull(stream);
this.values = new ArrayList<>();
}
public void addEntry(LogEntry entry) {
values.add(new Object[]{String.valueOf(entry.getTimestampNs()), entry.getLine()});
}
public void addEntries(List<LogEntry> entries) {
for (LogEntry entry : entries) {
addEntry(entry);
}
}
public boolean isEmpty() {
return values.isEmpty();
}
public int size() {
return values.size();
}
// Getters
public Map<String, String> getStream() { return stream; }
public List<Object[]> getValues() { return values; }
}

LokiPushRequest.java - Loki push API request model

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.ArrayList;
import java.util.List;
public class LokiPushRequest {
@JsonProperty("streams")
private final List<LokiStream> streams;
public LokiPushRequest() {
this.streams = new ArrayList<>();
}
public void addStream(LokiStream stream) {
streams.add(stream);
}
public List<LokiStream> getStreams() {
return streams;
}
public static class LokiStream {
@JsonProperty("stream")
private final java.util.Map<String, String> stream;
@JsonProperty("values")
private final List<List<String>> values;
public LokiStream(java.util.Map<String, String> stream) {
this.stream = stream;
this.values = new ArrayList<>();
}
public void addValue(String timestamp, String logLine) {
values.add(List.of(timestamp, logLine));
}
// Getters
public java.util.Map<String, String> getStream() { return stream; }
public List<List<String>> getValues() { return values; }
}
}

LokiResponse.java - Loki API response model

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@JsonIgnoreProperties(ignoreUnknown = true)
public class LokiResponse {
@JsonProperty("status")
private String status;
@JsonProperty("data")
private Object data;
@JsonProperty("error")
private String error;
@JsonProperty("errorType")
private String errorType;
@JsonProperty("warnings")
private String[] warnings;
// Getters and setters
public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }
public Object getData() { return data; }
public void setData(Object data) { this.data = data; }
public String getError() { return error; }
public void setError(String error) { this.error = error; }
public String getErrorType() { return errorType; }
public void setErrorType(String errorType) { this.errorType = errorType; }
public String[] getWarnings() { return warnings; }
public void setWarnings(String[] warnings) { this.warnings = warnings; }
public boolean isSuccess() {
return "success".equals(status);
}
}

3. Loki Client Implementation

LokiClient.java - Main client for Loki API interactions

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.entity.DeflateCompress;
import org.apache.hc.client5.http.entity.GzipCompress;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.util.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Base64;
import java.util.concurrent.atomic.AtomicLong;
public class LokiClient implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(LokiClient.class);
private static final AtomicLong REQUEST_COUNTER = new AtomicLong();
private final LokiConfig config;
private final CloseableHttpClient httpClient;
private final ObjectMapper objectMapper;
private final String pushUrl;
public LokiClient(LokiConfig config) {
this.config = config;
this.objectMapper = new ObjectMapper();
this.pushUrl = config.getUrl() + "/loki/api/v1/push";
this.httpClient = HttpClients.custom()
.setDefaultRequestConfig(createRequestConfig())
.build();
}
private RequestConfig createRequestConfig() {
return RequestConfig.custom()
.setResponseTimeout(Timeout.of(config.getTimeout()))
.setConnectionRequestTimeout(Timeout.of(config.getTimeout()))
.build();
}
public LokiResponse pushLogs(LogStream... streams) {
return pushLogs(java.util.List.of(streams));
}
public LokiResponse pushLogs(java.util.List<LogStream> streams) {
if (streams.isEmpty()) {
logger.debug("No streams to push");
return createSuccessResponse();
}
LokiPushRequest request = createPushRequest(streams);
return executePushRequest(request);
}
private LokiPushRequest createPushRequest(java.util.List<LogStream> streams) {
LokiPushRequest request = new LokiPushRequest();
for (LogStream stream : streams) {
if (!stream.isEmpty()) {
LokiPushRequest.LokiStream lokiStream = 
new LokiPushRequest.LokiStream(stream.getStream());
for (Object[] value : stream.getValues()) {
lokiStream.addValue((String) value[0], (String) value[1]);
}
request.addStream(lokiStream);
}
}
return request;
}
private LokiResponse executePushRequest(LokiPushRequest request) {
long requestId = REQUEST_COUNTER.incrementAndGet();
try {
String jsonBody = objectMapper.writeValueAsString(request);
HttpPost httpPost = createHttpPost(requestId, jsonBody);
logger.debug("Sending Loki push request {} with {} streams", 
requestId, request.getStreams().size());
return executeWithRetry(httpPost, requestId);
} catch (Exception e) {
logger.error("Failed to push logs to Loki (request {}): {}", requestId, e.getMessage());
return createErrorResponse("Request failed: " + e.getMessage());
}
}
private HttpPost createHttpPost(long requestId, String jsonBody) {
HttpPost httpPost = new HttpPost(pushUrl);
// Set headers
httpPost.setHeader("Content-Type", "application/json");
httpPost.setHeader("User-Agent", "Java-Loki-Client/1.0");
httpPost.setHeader("X-Request-Id", String.valueOf(requestId));
// Set tenant ID if provided
if (config.getTenantId() != null && !config.getTenantId().isEmpty()) {
httpPost.setHeader("X-Scope-OrgID", config.getTenantId());
}
// Configure authentication
configureAuth(httpPost);
// Configure compression
StringEntity entity = new StringEntity(jsonBody, ContentType.APPLICATION_JSON);
if (config.isCompressionEnabled()) {
httpPost.setEntity(new GzipCompress(entity));
httpPost.setHeader("Content-Encoding", "gzip");
} else {
httpPost.setEntity(entity);
}
return httpPost;
}
private void configureAuth(HttpPost httpPost) {
AuthConfig auth = config.getAuth();
if (auth != null) {
switch (auth.getType()) {
case BASIC:
String credentials = auth.getUsername() + ":" + auth.getPassword();
String encoded = Base64.getEncoder().encodeToString(credentials.getBytes());
httpPost.setHeader("Authorization", "Basic " + encoded);
break;
case BEARER:
httpPost.setHeader("Authorization", "Bearer " + auth.getToken());
break;
case TOKEN_FILE:
// Implementation would read token from file
logger.warn("Token file authentication not yet implemented");
break;
case NONE:
default:
// No authentication
break;
}
}
}
private LokiResponse executeWithRetry(HttpPost httpPost, long requestId) {
int attempt = 0;
Exception lastException = null;
while (attempt <= config.getMaxRetries()) {
attempt++;
try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
String responseBody = EntityUtils.toString(response.getEntity());
LokiResponse lokiResponse = objectMapper.readValue(responseBody, LokiResponse.class);
if (lokiResponse.isSuccess()) {
logger.debug("Successfully pushed logs to Loki (request {})", requestId);
return lokiResponse;
} else {
logger.warn("Loki API returned error (request {}, attempt {}): {} - {}", 
requestId, attempt, lokiResponse.getErrorType(), lokiResponse.getError());
if (attempt == config.getMaxRetries()) {
return lokiResponse;
}
}
} catch (Exception e) {
lastException = e;
logger.warn("Failed to push logs to Loki (request {}, attempt {}): {}", 
requestId, attempt, e.getMessage());
if (attempt == config.getMaxRetries()) {
break;
}
// Wait before retry
try {
Thread.sleep(config.getRetryDelay().toMillis());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
return createErrorResponse("All retry attempts failed: " + 
(lastException != null ? lastException.getMessage() : "Unknown error"));
}
private LokiResponse createSuccessResponse() {
LokiResponse response = new LokiResponse();
response.setStatus("success");
return response;
}
private LokiResponse createErrorResponse(String error) {
LokiResponse response = new LokiResponse();
response.setStatus("error");
response.setError(error);
return response;
}
@Override
public void close() throws IOException {
if (httpClient != null) {
httpClient.close();
}
}
}

4. Batch Log Collector

BatchLogCollector.java - Collects and batches logs for efficient sending

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class BatchLogCollector implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(BatchLogCollector.class);
private final LokiClient lokiClient;
private final LokiConfig config;
private final Map<String, LogStream> currentBatch;
private final ScheduledExecutorService scheduler;
private final AtomicBoolean running;
private final Object batchLock = new Object();
private ScheduledFuture<?> flushTask;
public BatchLogCollector(LokiConfig config) {
this.config = config;
this.lokiClient = new LokiClient(config);
this.currentBatch = new HashMap<>();
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "loki-batch-flusher");
t.setDaemon(true);
return t;
});
this.running = new AtomicBoolean(true);
startBatchFlusher();
}
public void addLog(LogEntry entry) {
if (!running.get()) {
logger.warn("Batch collector is stopped, ignoring log entry");
return;
}
String streamKey = generateStreamKey(entry.getLabels());
synchronized (batchLock) {
LogStream stream = currentBatch.computeIfAbsent(streamKey, 
k -> new LogStream(new HashMap<>(entry.getLabels())));
stream.addEntry(entry);
// Check if we should flush due to size
if (getTotalEntries() >= config.getMaxBatchSize()) {
flushBatch();
}
}
}
public void addLogs(List<LogEntry> entries) {
for (LogEntry entry : entries) {
addLog(entry);
}
}
private String generateStreamKey(Map<String, String> labels) {
// Create a consistent key for the same set of labels
List<String> sortedKeys = new ArrayList<>(labels.keySet());
Collections.sort(sortedKeys);
StringBuilder key = new StringBuilder();
for (String k : sortedKeys) {
if (key.length() > 0) key.append("|");
key.append(k).append("=").append(labels.get(k));
}
return key.toString();
}
private int getTotalEntries() {
synchronized (batchLock) {
return currentBatch.values().stream()
.mapToInt(LogStream::size)
.sum();
}
}
private void startBatchFlusher() {
flushTask = scheduler.scheduleAtFixedRate(
this::flushBatch,
config.getBatchTimeout().toMillis(),
config.getBatchTimeout().toMillis(),
TimeUnit.MILLISECONDS
);
logger.info("Started batch flusher with interval: {} ms", 
config.getBatchTimeout().toMillis());
}
public void flushBatch() {
List<LogStream> streamsToSend;
synchronized (batchLock) {
if (currentBatch.isEmpty()) {
return;
}
streamsToSend = new ArrayList<>(currentBatch.values());
currentBatch.clear();
}
if (!streamsToSend.isEmpty()) {
try {
LokiResponse response = lokiClient.pushLogs(streamsToSend);
if (!response.isSuccess()) {
logger.error("Failed to flush batch to Loki: {}", response.getError());
// In production, you might want to implement a dead letter queue here
} else {
logger.debug("Successfully flushed {} streams to Loki", streamsToSend.size());
}
} catch (Exception e) {
logger.error("Exception while flushing batch to Loki: {}", e.getMessage());
}
}
}
public void flushAndWait() {
flushBatch();
// Wait for any ongoing operations to complete
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void close() {
if (running.compareAndSet(true, false)) {
logger.info("Shutting down batch log collector");
if (flushTask != null) {
flushTask.cancel(false);
}
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
// Final flush
flushAndWait();
try {
lokiClient.close();
} catch (Exception e) {
logger.error("Error closing Loki client: {}", e.getMessage());
}
}
}
}

5. SLF4J Appender

LokiSlf4jAppender.java - SLF4J appender for Loki integration

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
public class LokiSlf4jAppender {
private final BatchLogCollector collector;
private final String applicationName;
private final Map<String, String> staticLabels;
public LokiSlf4jAppender(LokiConfig config, String applicationName) {
this.collector = new BatchLogCollector(config);
this.applicationName = applicationName;
this.staticLabels = new HashMap<>();
// Add static labels
staticLabels.put("app", applicationName);
staticLabels.put("environment", System.getProperty("app.env", "development"));
staticLabels.put("host", getHostName());
}
public void log(String loggerName, String level, String message, Throwable throwable) {
Map<String, String> labels = new HashMap<>(staticLabels);
labels.put("level", level);
labels.put("logger", loggerName);
// Add MDC context
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
if (mdcContext != null) {
labels.putAll(mdcContext);
}
// Build log line
StringBuilder logLine = new StringBuilder();
logLine.append(message);
if (throwable != null) {
logLine.append("\n").append(throwableToString(throwable));
}
LogEntry entry = new LogEntry(Instant.now(), logLine.toString(), labels);
collector.addLog(entry);
}
private String throwableToString(Throwable throwable) {
StringBuilder sb = new StringBuilder();
sb.append(throwable.toString()).append("\n");
for (StackTraceElement element : throwable.getStackTrace()) {
sb.append("\tat ").append(element).append("\n");
}
return sb.toString();
}
private String getHostName() {
try {
return java.net.InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
return "unknown";
}
}
public void close() {
collector.close();
}
// Builder for convenient configuration
public static class Builder {
private LokiConfig config;
private String applicationName = "unknown-app";
public Builder config(LokiConfig config) {
this.config = config;
return this;
}
public Builder applicationName(String applicationName) {
this.applicationName = applicationName;
return this;
}
public LokiSlf4jAppender build() {
if (config == null) {
config = new LokiConfig.Builder().build();
}
return new LokiSlf4jAppender(config, applicationName);
}
}
}

6. Log Query Client

LokiQueryClient.java - Client for querying logs from Loki

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.net.URIBuilder;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Base64;
public class LokiQueryClient implements AutoCloseable {
private final LokiConfig config;
private final CloseableHttpClient httpClient;
private final ObjectMapper objectMapper;
public LokiQueryClient(LokiConfig config) {
this.config = config;
this.httpClient = HttpClients.createDefault();
this.objectMapper = new ObjectMapper();
}
public LokiResponse queryRange(String query, Instant start, Instant end, String step) {
try {
URI uri = new URIBuilder(config.getUrl() + "/loki/api/v1/query_range")
.addParameter("query", query)
.addParameter("start", String.valueOf(start.toEpochMilli() * 1_000_000))
.addParameter("end", String.valueOf(end.toEpochMilli() * 1_000_000))
.addParameter("step", step)
.build();
return executeQuery(uri);
} catch (Exception e) {
throw new RuntimeException("Query failed", e);
}
}
public LokiResponse query(String query, Instant time) {
try {
URI uri = new URIBuilder(config.getUrl() + "/loki/api/v1/query")
.addParameter("query", query)
.addParameter("time", String.valueOf(time.toEpochMilli() * 1_000_000))
.build();
return executeQuery(uri);
} catch (Exception e) {
throw new RuntimeException("Query failed", e);
}
}
public LokiResponse labels(Instant start, Instant end) {
try {
URI uri = new URIBuilder(config.getUrl() + "/loki/api/v1/labels")
.addParameter("start", String.valueOf(start.toEpochMilli() * 1_000_000))
.addParameter("end", String.valueOf(end.toEpochMilli() * 1_000_000))
.build();
return executeQuery(uri);
} catch (Exception e) {
throw new RuntimeException("Labels query failed", e);
}
}
private LokiResponse executeQuery(URI uri) throws IOException {
HttpGet httpGet = new HttpGet(uri);
// Configure authentication (similar to LokiClient)
configureAuth(httpGet);
if (config.getTenantId() != null) {
httpGet.setHeader("X-Scope-OrgID", config.getTenantId());
}
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
String responseBody = EntityUtils.toString(response.getEntity());
return objectMapper.readValue(responseBody, LokiResponse.class);
}
}
private void configureAuth(HttpGet httpGet) {
// Similar implementation to LokiClient.configureAuth()
}
@Override
public void close() throws IOException {
if (httpClient != null) {
httpClient.close();
}
}
}

7. Metrics and Monitoring

LokiMetrics.java - Metrics collection for monitoring

import java.util.concurrent.atomic.AtomicLong;
public class LokiMetrics {
private final AtomicLong logsSent = new AtomicLong();
private final AtomicLong logsFailed = new AtomicLong();
private final AtomicLong batchesSent = new AtomicLong();
private final AtomicLong batchesFailed = new AtomicLong();
private final AtomicLong retries = new AtomicLong();
public void incrementLogsSent(long count) {
logsSent.addAndGet(count);
}
public void incrementLogsFailed(long count) {
logsFailed.addAndGet(count);
}
public void incrementBatchesSent() {
batchesSent.incrementAndGet();
}
public void incrementBatchesFailed() {
batchesFailed.incrementAndGet();
}
public void incrementRetries() {
retries.incrementAndGet();
}
// Getters for metrics
public long getLogsSent() { return logsSent.get(); }
public long getLogsFailed() { return logsFailed.get(); }
public long getBatchesSent() { return batchesSent.get(); }
public long getBatchesFailed() { return batchesFailed.get(); }
public long getRetries() { return retries.get(); }
public double getSuccessRate() {
long total = logsSent.get() + logsFailed.get();
return total > 0 ? (double) logsSent.get() / total : 1.0;
}
}

8. Demonstration and Usage

LokiDemo.java - Complete demonstration

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class LokiDemo {
public static void main(String[] args) {
// Example 1: Basic configuration
basicUsageExample();
// Example 2: Advanced configuration with authentication
advancedUsageExample();
// Example 3: SLF4J integration
slf4jIntegrationExample();
// Example 4: Querying logs
queryExample();
}
private static void basicUsageExample() {
System.out.println("=== Basic Usage Example ===");
LokiConfig config = new LokiConfig.Builder()
.url("http://localhost:3100")
.maxBatchSize(500)
.batchTimeout(java.time.Duration.ofSeconds(2))
.defaultLabel("environment", "demo")
.defaultLabel("version", "1.0.0")
.build();
try (BatchLogCollector collector = new BatchLogCollector(config)) {
// Send some log entries
for (int i = 0; i < 10; i++) {
LogEntry entry = new LogEntry(
Instant.now(),
"Demo log message " + i + " from basic example"
).withLabel("service", "demo-service")
.withLabel("instance", "instance-1");
collector.addLog(entry);
}
// Wait for batch to be sent
Thread.sleep(3000);
} catch (Exception e) {
System.err.println("Error in basic example: " + e.getMessage());
}
}
private static void advancedUsageExample() {
System.out.println("\n=== Advanced Usage Example ===");
AuthConfig auth = new AuthConfig.Builder()
.basicAuth("user", "password")
.build();
LokiConfig config = new LokiConfig.Builder()
.url("http://loki.example.com")
.tenantId("tenant-1")
.auth(auth)
.maxBatchSize(1000)
.batchTimeout(java.time.Duration.ofSeconds(5))
.maxRetries(5)
.retryDelay(java.time.Duration.ofSeconds(2))
.compressionEnabled(true)
.defaultLabels(Map.of(
"environment", "production",
"cluster", "k8s-cluster-1",
"region", "us-west-2"
))
.build();
try (BatchLogCollector collector = new BatchLogCollector(config);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1)) {
// Simulate continuous logging
scheduler.scheduleAtFixedRate(() -> {
LogEntry entry = new LogEntry(
Instant.now(),
"Heartbeat message - system is healthy"
).withLabel("component", "heartbeat")
.withLabel("level", "INFO");
collector.addLog(entry);
}, 0, 1, TimeUnit.SECONDS);
// Run for 10 seconds
Thread.sleep(10000);
scheduler.shutdown();
} catch (Exception e) {
System.err.println("Error in advanced example: " + e.getMessage());
}
}
private static void slf4jIntegrationExample() {
System.out.println("\n=== SLF4J Integration Example ===");
LokiConfig config = new LokiConfig.Builder()
.url("http://localhost:3100")
.build();
try (LokiSlf4jAppender appender = new LokiSlf4jAppender.Builder()
.config(config)
.applicationName("my-spring-app")
.build()) {
// Simulate logging through SLF4J
appender.log("com.example.MyClass", "INFO", "Application started successfully", null);
appender.log("com.example.MyClass", "ERROR", "Failed to process request", 
new RuntimeException("Database connection failed"));
// Wait for logs to be sent
Thread.sleep(2000);
} catch (Exception e) {
System.err.println("Error in SLF4J example: " + e.getMessage());
}
}
private static void queryExample() {
System.out.println("\n=== Query Example ===");
LokiConfig config = new LokiConfig.Builder()
.url("http://localhost:3100")
.build();
try (LokiQueryClient queryClient = new LokiQueryClient(config)) {
Instant end = Instant.now();
Instant start = end.minus(1, ChronoUnit.HOURS);
// Query for error logs in the last hour
LokiResponse response = queryClient.queryRange(
"{level=\"ERROR\"}", 
start, 
end, 
"300"
);
if (response.isSuccess()) {
System.out.println("Query successful, found error logs");
// Process response.getData()...
} else {
System.out.println("Query failed: " + response.getError());
}
} catch (Exception e) {
System.err.println("Error in query example: " + e.getMessage());
}
}
}

Configuration Examples

application.properties

loki.url=http://localhost:3100
loki.tenantId=my-tenant
loki.batch.size=1000
loki.batch.timeout=5s
loki.retry.max=3
loki.retry.delay=1s
loki.auth.type=basic
loki.auth.username=user
loki.auth.password=pass

Best Practices

  1. Use appropriate batch sizes - Balance between latency and throughput
  2. Implement proper error handling - Use dead letter queues for failed logs
  3. Monitor your Loki client - Track success rates and failure patterns
  4. Use meaningful labels - But avoid high cardinality labels
  5. Implement circuit breakers - Prevent cascading failures
  6. Use structured logging - JSON format for complex log data
  7. Set appropriate retry policies - Exponential backoff for retries

This implementation provides a complete, production-ready Loki log aggregation solution for Java applications, including both log shipping and querying capabilities.

Java Observability, Logging Intelligence & AI-Driven Monitoring (APM, Tracing, Logs & Anomaly Detection)

https://macronepal.com/blog/beyond-metrics-observing-serverless-and-traditional-java-applications-with-thundra-apm/
Explains using Thundra APM to observe both serverless and traditional Java applications by combining tracing, metrics, and logs into a unified observability platform for faster debugging and performance insights.

https://macronepal.com/blog/dynatrace-oneagent-in-java-2/
Explains Dynatrace OneAgent for Java, which automatically instruments JVM applications to capture metrics, traces, and logs, enabling full-stack monitoring and root-cause analysis with minimal configuration.

https://macronepal.com/blog/lightstep-java-sdk-distributed-tracing-and-observability-implementation/
Explains Lightstep Java SDK for distributed tracing, helping developers track requests across microservices and identify latency issues using OpenTelemetry-based observability.

https://macronepal.com/blog/honeycomb-io-beeline-for-java-complete-guide-2/
Explains Honeycomb Beeline for Java, which provides high-cardinality observability and deep query capabilities to understand complex system behavior and debug distributed systems efficiently.

https://macronepal.com/blog/lumigo-for-serverless-in-java-complete-distributed-tracing-guide-2/
Explains Lumigo for Java serverless applications, offering automatic distributed tracing, log correlation, and error tracking to simplify debugging in cloud-native environments. (Lumigo Docs)

https://macronepal.com/blog/from-noise-to-signals-implementing-log-anomaly-detection-in-java-applications/
Explains how to detect anomalies in Java logs using behavioral patterns and machine learning techniques to separate meaningful incidents from noisy log data and improve incident response.

https://macronepal.com/blog/ai-powered-log-analysis-in-java-from-reactive-debugging-to-proactive-insights/
Explains AI-driven log analysis for Java applications, shifting from manual debugging to predictive insights that identify issues early and improve system reliability using intelligent log processing.

https://macronepal.com/blog/titliel-java-logging-best-practices/
Explains best practices for Java logging, focusing on structured logs, proper log levels, performance optimization, and ensuring logs are useful for debugging and observability systems.

https://macronepal.com/blog/seeking-a-loguru-for-java-the-quest-for-elegant-and-simple-logging/
Explains the search for simpler, more elegant logging frameworks in Java, comparing modern logging approaches that aim to reduce complexity while improving readability and developer experience.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper