A comprehensive guide to implementing Elasticsearch log indexing in Java applications with advanced features like bulk processing, error handling, and performance optimization.
Project Overview
This implementation provides a robust Elasticsearch logging solution that can handle high-volume log data with efficient indexing, search capabilities, and real-time analytics.
Architecture
graph TB A[Java Application] --> B[Log Collector] B --> C[Log Processor] C --> D[Elasticsearch Client] D --> E[Elasticsearch Cluster] F[Monitoring] --> D G[Kibana] --> E subgraph "Elasticsearch Cluster" E --> E1[Node 1] E --> E2[Node 2] E --> E3[Node 3] end
Dependencies
Maven Configuration
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
<groupId>com.elasticsearch</groupId>
<artifactId>log-indexer</artifactId>
<version>1.0.0</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<elasticsearch.version>8.9.0</elasticsearch.version>
<spring-boot.version>3.1.0</spring-boot.version>
<log4j.version>2.20.0</log4j.version>
</properties>
<dependencies>
<!-- Elasticsearch Java Client -->
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<!-- Jackson for JSON -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.0</version>
</dependency>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-boot.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Core Domain Models
package com.elasticsearch.logging.model;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonInclude;
import java.time.Instant;
import java.util.Map;
import java.util.UUID;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class LogEntry {
private String id;
private String traceId;
private String spanId;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", timezone = "UTC")
private Instant timestamp;
private String level; // DEBUG, INFO, WARN, ERROR, FATAL
private String logger;
private String message;
private String thread;
private String application;
private String environment;
private String hostname;
private String serviceName;
private String serviceVersion;
private Map<String, Object> context;
private Map<String, Object> metadata;
private ExceptionInfo exception;
// Constructors
public LogEntry() {
this.id = UUID.randomUUID().toString();
this.timestamp = Instant.now();
}
public LogEntry(String level, String logger, String message) {
this();
this.level = level;
this.logger = logger;
this.message = message;
}
// Builder pattern for fluent creation
public static Builder builder() {
return new Builder();
}
public static class Builder {
private final LogEntry logEntry;
public Builder() {
this.logEntry = new LogEntry();
}
public Builder level(String level) {
logEntry.level = level;
return this;
}
public Builder logger(String logger) {
logEntry.logger = logger;
return this;
}
public Builder message(String message) {
logEntry.message = message;
return this;
}
public Builder traceId(String traceId) {
logEntry.traceId = traceId;
return this;
}
public Builder spanId(String spanId) {
logEntry.spanId = spanId;
return this;
}
public Builder application(String application) {
logEntry.application = application;
return this;
}
public Builder environment(String environment) {
logEntry.environment = environment;
return this;
}
public Builder serviceName(String serviceName) {
logEntry.serviceName = serviceName;
return this;
}
public Builder context(Map<String, Object> context) {
logEntry.context = context;
return this;
}
public Builder addContext(String key, Object value) {
if (logEntry.context == null) {
logEntry.context = new java.util.HashMap<>();
}
logEntry.context.put(key, value);
return this;
}
public Builder exception(Throwable throwable) {
if (throwable != null) {
logEntry.exception = new ExceptionInfo(throwable);
}
return this;
}
public LogEntry build() {
return logEntry;
}
}
// Getters and setters
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getTraceId() { return traceId; }
public void setTraceId(String traceId) { this.traceId = traceId; }
public String getSpanId() { return spanId; }
public void setSpanId(String spanId) { this.spanId = spanId; }
public Instant getTimestamp() { return timestamp; }
public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }
public String getLevel() { return level; }
public void setLevel(String level) { this.level = level; }
public String getLogger() { return logger; }
public void setLogger(String logger) { this.logger = logger; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public String getThread() { return thread; }
public void setThread(String thread) { this.thread = thread; }
public String getApplication() { return application; }
public void setApplication(String application) { this.application = application; }
public String getEnvironment() { return environment; }
public void setEnvironment(String environment) { this.environment = environment; }
public String getHostname() { return hostname; }
public void setHostname(String hostname) { this.hostname = hostname; }
public String getServiceName() { return serviceName; }
public void setServiceName(String serviceName) { this.serviceName = serviceName; }
public String getServiceVersion() { return serviceVersion; }
public void setServiceVersion(String serviceVersion) { this.serviceVersion = serviceVersion; }
public Map<String, Object> getContext() { return context; }
public void setContext(Map<String, Object> context) { this.context = context; }
public Map<String, Object> getMetadata() { return metadata; }
public void setMetadata(Map<String, Object> metadata) { this.metadata = metadata; }
public ExceptionInfo getException() { return exception; }
public void setException(ExceptionInfo exception) { this.exception = exception; }
}
@JsonInclude(JsonInclude.Include.NON_NULL)
class ExceptionInfo {
private String type;
private String message;
private String stackTrace;
private String cause;
public ExceptionInfo() {}
public ExceptionInfo(Throwable throwable) {
this.type = throwable.getClass().getName();
this.message = throwable.getMessage();
this.stackTrace = getStackTraceAsString(throwable);
if (throwable.getCause() != null) {
this.cause = throwable.getCause().getMessage();
}
}
private String getStackTraceAsString(Throwable throwable) {
java.io.StringWriter sw = new java.io.StringWriter();
java.io.PrintWriter pw = new java.io.PrintWriter(sw);
throwable.printStackTrace(pw);
return sw.toString();
}
// Getters and setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public String getStackTrace() { return stackTrace; }
public void setStackTrace(String stackTrace) { this.stackTrace = stackTrace; }
public String getCause() { return cause; }
public void setCause(String cause) { this.cause = cause; }
}
// Bulk operation wrapper
class BulkOperation<T> {
private final String index;
private final T document;
private final String operation; // index, create, update, delete
public BulkOperation(String index, T document, String operation) {
this.index = index;
this.document = document;
this.operation = operation;
}
// Getters
public String getIndex() { return index; }
public T getDocument() { return document; }
public String getOperation() { return operation; }
}
Elasticsearch Configuration
package com.elasticsearch.logging.config;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
@Configuration
public class ElasticsearchConfig {
@Value("${elasticsearch.host:localhost}")
private String host;
@Value("${elasticsearch.port:9200}")
private int port;
@Value("${elasticsearch.scheme:http}")
private String scheme;
@Value("${elasticsearch.username:}")
private String username;
@Value("${elasticsearch.password:}")
private String password;
@Value("${elasticsearch.api-key:}")
private String apiKey;
@Value("${elasticsearch.ssl.truststore.path:}")
private String truststorePath;
@Value("${elasticsearch.connection.timeout:30000}")
private int connectionTimeout;
@Value("${elasticsearch.socket.timeout:60000}")
private int socketTimeout;
@Bean
public RestClient restClient() throws Exception {
RestClient.Builder builder = RestClient.builder(
new HttpHost(host, port, scheme)
);
// Configure authentication
if (!username.isEmpty() && !password.isEmpty()) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(username, password)
);
builder.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
);
}
// Configure API Key authentication
if (!apiKey.isEmpty()) {
builder.setDefaultHeaders(new Header[]{
new BasicHeader("Authorization", "ApiKey " + apiKey)
});
}
// Configure SSL if needed
if (scheme.equals("https") && !truststorePath.isEmpty()) {
builder.setHttpClientConfigCallback(httpClientBuilder -> {
try {
SSLContext sslContext = createSSLContext();
httpClientBuilder.setSSLContext(sslContext);
} catch (Exception e) {
throw new RuntimeException("Failed to configure SSL context", e);
}
return httpClientBuilder;
});
}
// Configure timeouts
builder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(connectionTimeout)
.setSocketTimeout(socketTimeout)
);
return builder.build();
}
@Bean
public ElasticsearchTransport elasticsearchTransport(RestClient restClient) {
return new RestClientTransport(restClient, new JacksonJsonpMapper());
}
@Bean
public ElasticsearchClient elasticsearchClient(ElasticsearchTransport transport) {
return new ElasticsearchClient(transport);
}
private SSLContext createSSLContext() throws Exception {
CertificateFactory factory = CertificateFactory.getInstance("X.509");
X509Certificate certificate = (X509Certificate) factory.generateCertificate(
new FileInputStream(truststorePath)
);
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null, null);
keyStore.setCertificateEntry("elasticsearch", certificate);
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm()
);
trustManagerFactory.init(keyStore);
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
return sslContext;
}
}
Elasticsearch Service Implementation
package com.elasticsearch.logging.service;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.elasticsearch.indices.get_mapping.IndexMappingRecord;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import com.elasticsearch.logging.model.LogEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
@Service
public class ElasticsearchLogService {
private static final Logger logger = LoggerFactory.getLogger(ElasticsearchLogService.class);
private static final DateTimeFormatter INDEX_DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy.MM.dd");
private static final String LOG_INDEX_PREFIX = "logs-";
private static final int BULK_BATCH_SIZE = 1000;
private static final int BULK_FLUSH_INTERVAL_MS = 5000;
private final ElasticsearchClient client;
private final BlockingQueue<LogEntry> logQueue;
private final AtomicLong processedCount;
private final AtomicLong errorCount;
private final Map<String, Object> indexSettings;
@Value("${elasticsearch.index.prefix:logs}")
private String indexPrefix;
@Value("${elasticsearch.index.shards:3}")
private int numberOfShards;
@Value("${elasticsearch.index.replicas:1}")
private int numberOfReplicas;
@Value("${elasticsearch.bulk.batch-size:1000}")
private int bulkBatchSize;
@Value("${elasticsearch.bulk.flush-interval:5000}")
private long bulkFlushInterval;
@Autowired
public ElasticsearchLogService(ElasticsearchClient client) {
this.client = client;
this.logQueue = new LinkedBlockingQueue<>();
this.processedCount = new AtomicLong(0);
this.errorCount = new AtomicLong(0);
this.indexSettings = createIndexSettings();
// Initialize indices and start background processor
initializeIndices();
startBackgroundProcessor();
}
private Map<String, Object> createIndexSettings() {
Map<String, Object> settings = new HashMap<>();
Map<String, Object> indexSettings = new HashMap<>();
indexSettings.put("number_of_shards", numberOfShards);
indexSettings.put("number_of_replicas", numberOfReplicas);
// Analysis settings for better search
Map<String, Object> analysis = new HashMap<>();
Map<String, Object> analyzer = new HashMap<>();
Map<String, Object> logMessageAnalyzer = new HashMap<>();
logMessageAnalyzer.put("type", "custom");
logMessageAnalyzer.put("tokenizer", "standard");
logMessageAnalyzer.put("filter", Arrays.asList("lowercase", "asciifolding"));
analyzer.put("log_message_analyzer", logMessageAnalyzer);
analysis.put("analyzer", analyzer);
indexSettings.put("analysis", analysis);
settings.put("index", indexSettings);
return settings;
}
private void initializeIndices() {
try {
// Check if today's index exists, create if not
String todayIndex = getIndexNameForDate(LocalDate.now());
createIndexIfNotExists(todayIndex);
logger.info("Elasticsearch log service initialized");
} catch (Exception e) {
logger.error("Failed to initialize Elasticsearch indices", e);
}
}
private void startBackgroundProcessor() {
Thread processorThread = new Thread(this::processQueue, "LogQueueProcessor");
processorThread.setDaemon(true);
processorThread.start();
}
// Public API Methods
public void indexLog(LogEntry logEntry) {
try {
logQueue.put(logEntry);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Interrupted while adding log to queue", e);
}
}
@Async
public void indexLogAsync(LogEntry logEntry) {
indexLog(logEntry);
}
public void indexLogsBulk(List<LogEntry> logEntries) {
if (logEntries.isEmpty()) {
return;
}
String indexName = getIndexNameForDate(LocalDate.now());
List<BulkOperation> operations = new ArrayList<>();
for (LogEntry logEntry : logEntries) {
operations.add(BulkOperation.of(op -> op
.index(idx -> idx
.index(indexName)
.id(logEntry.getId())
.document(logEntry)
)
));
}
executeBulkOperations(operations);
}
public SearchResponse<LogEntry> searchLogs(String query, int from, int size) throws IOException {
String indexName = getIndexPatternForCurrentMonth();
return client.search(s -> s
.index(indexName)
.from(from)
.size(size)
.query(q -> q
.bool(b -> b
.should(sh -> sh
.match(m -> m
.field("message")
.query(query)
)
)
.should(sh -> sh
.wildcard(w -> w
.field("message")
.value("*" + query + "*")
)
)
)
)
.sort(so -> so
.field(f -> f
.field("timestamp")
.order(co.elastic.clients.elasticsearch._types.SortOrder.Desc)
)
),
LogEntry.class
);
}
public SearchResponse<LogEntry> searchLogsByLevel(String level, Instant from, Instant to, int size) throws IOException {
String indexPattern = getIndexPatternForDateRange(from, to);
return client.search(s -> s
.index(indexPattern)
.size(size)
.query(q -> q
.bool(b -> b
.must(m -> m
.term(t -> t
.field("level")
.value(level)
)
)
.must(m -> m
.range(r -> r
.field("timestamp")
.gte(co.elastic.clients.elasticsearch._types.query_dsl.RangeQuery._DESERIALIZER)
.lte(co.elastic.clients.elasticsearch._types.query_dsl.RangeQuery._DESERIALIZER)
)
)
)
)
.sort(so -> so
.field(f -> f
.field("timestamp")
.order(co.elastic.clients.elasticsearch._types.SortOrder.Desc)
)
),
LogEntry.class
);
}
public Map<String, Object> getClusterHealth() throws IOException {
return client.cluster().health(h -> h).toJson().asMap();
}
public Map<String, Object> getIndexStats(String indexPattern) throws IOException {
return client.indices().stats(s -> s.index(indexPattern)).toJson().asMap();
}
// Background Processing
private void processQueue() {
List<LogEntry> batch = new ArrayList<>(bulkBatchSize);
while (!Thread.currentThread().isInterrupted()) {
try {
// Wait for the first element
LogEntry logEntry = logQueue.take();
batch.add(logEntry);
// Drain remaining elements up to batch size
logQueue.drainTo(batch, bulkBatchSize - 1);
// Process the batch
processBatch(batch);
batch.clear();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logger.error("Error processing log batch", e);
errorCount.incrementAndGet();
}
}
}
@Scheduled(fixedRateString = "${elasticsearch.bulk.flush-interval:5000}")
public void flushQueue() {
List<LogEntry> batch = new ArrayList<>(bulkBatchSize);
logQueue.drainTo(batch, bulkBatchSize);
if (!batch.isEmpty()) {
processBatch(batch);
}
}
private void processBatch(List<LogEntry> batch) {
if (batch.isEmpty()) {
return;
}
// Group by date for index routing
Map<String, List<LogEntry>> logsByDate = new HashMap<>();
for (LogEntry logEntry : batch) {
String indexName = getIndexNameForDate(
LocalDate.from(logEntry.getTimestamp())
);
logsByDate.computeIfAbsent(indexName, k -> new ArrayList<>()).add(logEntry);
}
// Process each date group
for (Map.Entry<String, List<LogEntry>> entry : logsByDate.entrySet()) {
String indexName = entry.getKey();
List<LogEntry> dateBatch = entry.getValue();
try {
createIndexIfNotExists(indexName);
indexBatchToIndex(indexName, dateBatch);
processedCount.addAndGet(dateBatch.size());
} catch (Exception e) {
logger.error("Failed to index batch to index: " + indexName, e);
errorCount.addAndGet(dateBatch.size());
// Fallback: try individual indexing
for (LogEntry logEntry : dateBatch) {
try {
indexSingleLog(logEntry);
} catch (Exception ex) {
logger.error("Failed to index individual log: " + logEntry.getId(), ex);
}
}
}
}
}
private void indexBatchToIndex(String indexName, List<LogEntry> logEntries) {
List<BulkOperation> operations = new ArrayList<>();
for (LogEntry logEntry : logEntries) {
operations.add(BulkOperation.of(op -> op
.index(idx -> idx
.index(indexName)
.id(logEntry.getId())
.document(logEntry)
)
));
}
executeBulkOperations(operations);
}
private void executeBulkOperations(List<BulkOperation> operations) {
if (operations.isEmpty()) {
return;
}
try {
BulkResponse response = client.bulk(b -> b
.operations(operations)
.refresh(co.elastic.clients.elasticsearch._types.Refresh.WaitFor)
);
// Check for errors
if (response.errors()) {
for (BulkResponseItem item : response.items()) {
if (item.error() != null) {
logger.error("Bulk indexing error for {}: {}", item.id(), item.error().reason());
}
}
}
} catch (IOException e) {
throw new RuntimeException("Bulk indexing failed", e);
}
}
private void indexSingleLog(LogEntry logEntry) throws IOException {
String indexName = getIndexNameForDate(LocalDate.from(logEntry.getTimestamp()));
IndexResponse response = client.index(i -> i
.index(indexName)
.id(logEntry.getId())
.document(logEntry)
.refresh(co.elastic.clients.elasticsearch._types.Refresh.WaitFor)
);
if (!"created".equals(response.result().jsonValue())) {
logger.warn("Unexpected index result: {}", response.result().jsonValue());
}
}
// Index Management
private void createIndexIfNotExists(String indexName) throws IOException {
BooleanResponse exists = client.indices().exists(e -> e.index(indexName));
if (!exists.value()) {
CreateIndexResponse response = client.indices().create(c -> c
.index(indexName)
.settings(s -> s
.numberOfShards(String.valueOf(numberOfShards))
.numberOfReplicas(String.valueOf(numberOfReplicas))
)
.mappings(m -> m
.properties("timestamp", p -> p
.date(d -> d.format("strict_date_optional_time||epoch_millis"))
)
.properties("level", p -> p
.keyword(k -> k)
)
.properties("message", p -> p
.text(t -> t
.analyzer("log_message_analyzer")
.fields("keyword", f -> f.keyword(k -> k))
)
)
.properties("serviceName", p -> p
.keyword(k -> k)
)
.properties("application", p -> p
.keyword(k -> k)
)
.properties("environment", p -> p
.keyword(k -> k)
)
)
);
if (response.acknowledged()) {
logger.info("Created index: {}", indexName);
} else {
logger.warn("Index creation not acknowledged for: {}", indexName);
}
}
}
// Index naming utilities
private String getIndexNameForDate(LocalDate date) {
return indexPrefix + "-" + date.format(INDEX_DATE_FORMAT);
}
private String getIndexPatternForCurrentMonth() {
LocalDate now = LocalDate.now();
return indexPrefix + "-" + now.getYear() + "." +
String.format("%02d", now.getMonthValue()) + ".*";
}
private String getIndexPatternForDateRange(Instant from, Instant to) {
LocalDate fromDate = LocalDate.from(from);
LocalDate toDate = LocalDate.from(to);
if (fromDate.equals(toDate)) {
return getIndexNameForDate(fromDate);
}
return indexPrefix + "-*";
}
// Monitoring and Statistics
public Map<String, Object> getStatistics() {
Map<String, Object> stats = new HashMap<>();
stats.put("queueSize", logQueue.size());
stats.put("processedCount", processedCount.get());
stats.put("errorCount", errorCount.get());
stats.put("throughput", calculateThroughput());
return stats;
}
private double calculateThroughput() {
// Implementation for throughput calculation
return 0.0;
}
// Cleanup old indices
@Scheduled(cron = "0 0 2 * * ?") // Daily at 2 AM
public void cleanupOldIndices() {
try {
LocalDate retentionDate = LocalDate.now().minusDays(30); // 30 days retention
String[] allIndices = client.indices().get(g -> g.index(indexPrefix + "-*")).result().keySet().toArray(new String[0]);
for (String index : allIndices) {
LocalDate indexDate = extractDateFromIndexName(index);
if (indexDate != null && indexDate.isBefore(retentionDate)) {
client.indices().delete(d -> d.index(index));
logger.info("Deleted old index: {}", index);
}
}
} catch (Exception e) {
logger.error("Error during index cleanup", e);
}
}
private LocalDate extractDateFromIndexName(String indexName) {
try {
String datePart = indexName.substring(indexPrefix.length() + 1);
return LocalDate.parse(datePart, INDEX_DATE_FORMAT);
} catch (Exception e) {
return null;
}
}
}
Log4j2 Appender for Elasticsearch
package com.elasticsearch.logging.appender;
import com.elasticsearch.logging.model.LogEntry;
import com.elasticsearch.logging.service.ElasticsearchLogService;
import org.apache.logging.log4j.core.*;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.*;
import org.apache.logging.log4j.core.layout.PatternLayout;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Plugin(name = "ElasticsearchAppender", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE)
public class ElasticsearchAppender extends AbstractAppender {
private static ElasticsearchLogService logService;
private static final Map<String, String> CONTEXT_MAP = new ConcurrentHashMap<>();
private ElasticsearchAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions) {
super(name, filter, layout, ignoreExceptions, Property.EMPTY_ARRAY);
}
@PluginFactory
public static ElasticsearchAppender createAppender(
@PluginAttribute("name") String name,
@PluginElement("Filter") Filter filter,
@PluginElement("Layout") Layout<? extends Serializable> layout,
@PluginAttribute("ignoreExceptions") boolean ignoreExceptions) {
if (name == null) {
LOGGER.error("No name provided for ElasticsearchAppender");
return null;
}
if (layout == null) {
layout = PatternLayout.createDefaultLayout();
}
return new ElasticsearchAppender(name, filter, layout, ignoreExceptions);
}
public static void setLogService(ElasticsearchLogService service) {
logService = service;
}
public static void addContext(String key, String value) {
CONTEXT_MAP.put(key, value);
}
public static void removeContext(String key) {
CONTEXT_MAP.remove(key);
}
@Override
public void append(LogEvent event) {
if (logService == null) {
LOGGER.warn("ElasticsearchLogService not initialized for ElasticsearchAppender");
return;
}
try {
LogEntry logEntry = convertToLogEntry(event);
logService.indexLogAsync(logEntry);
} catch (Exception e) {
LOGGER.error("Failed to send log to Elasticsearch", e);
}
}
private LogEntry convertToLogEntry(LogEvent event) {
return LogEntry.builder()
.level(event.getLevel().name())
.logger(event.getLoggerName())
.message(event.getMessage().getFormattedMessage())
.thread(event.getThreadName())
.timestamp(event.getInstant().getEpochMillisecond())
.context(CONTEXT_MAP)
.exception(event.getThrown())
.build();
}
}
Spring Boot Configuration
package com.elasticsearch.logging.config;
import com.elasticsearch.logging.appender.ElasticsearchAppender;
import com.elasticsearch.logging.service.ElasticsearchLogService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class LogAppenderConfig {
@Autowired
private ElasticsearchLogService elasticsearchLogService;
@PostConstruct
public void initializeAppender() {
ElasticsearchAppender.setLogService(elasticsearchLogService);
// Add common context information
ElasticsearchAppender.addContext("application", "my-spring-app");
ElasticsearchAppender.addContext("environment", System.getenv("SPRING_PROFILES_ACTIVE"));
ElasticsearchAppender.addContext("hostname", getHostname());
}
private String getHostname() {
try {
return java.net.InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
return "unknown";
}
}
}
REST Controller for Log Management
package com.elasticsearch.logging.controller;
import com.elasticsearch.logging.model.LogEntry;
import com.elasticsearch.logging.service.ElasticsearchLogService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/api/logs")
public class LogController {
@Autowired
private ElasticsearchLogService logService;
@PostMapping
public ResponseEntity<String> addLog(@RequestBody LogEntry logEntry) {
logService.indexLog(logEntry);
return ResponseEntity.ok("Log accepted");
}
@PostMapping("/bulk")
public ResponseEntity<String> addLogsBulk(@RequestBody List<LogEntry> logEntries) {
logService.indexLogsBulk(logEntries);
return ResponseEntity.ok("Logs accepted");
}
@GetMapping("/search")
public ResponseEntity<Object> searchLogs(
@RequestParam String query,
@RequestParam(defaultValue = "0") int from,
@RequestParam(defaultValue = "50") int size) {
try {
return ResponseEntity.ok(logService.searchLogs(query, from, size));
} catch (IOException e) {
return ResponseEntity.badRequest().body("Search failed: " + e.getMessage());
}
}
@GetMapping("/search/level/{level}")
public ResponseEntity<Object> searchLogsByLevel(
@PathVariable String level,
@RequestParam(required = false) Instant from,
@RequestParam(required = false) Instant to,
@RequestParam(defaultValue = "100") int size) {
try {
if (from == null) from = Instant.now().minusSeconds(3600); // Default: last hour
if (to == null) to = Instant.now();
return ResponseEntity.ok(logService.searchLogsByLevel(level, from, to, size));
} catch (IOException e) {
return ResponseEntity.badRequest().body("Search failed: " + e.getMessage());
}
}
@GetMapping("/statistics")
public ResponseEntity<Map<String, Object>> getStatistics() {
return ResponseEntity.ok(logService.getStatistics());
}
@GetMapping("/health")
public ResponseEntity<Object> getClusterHealth() {
try {
return ResponseEntity.ok(logService.getClusterHealth());
} catch (IOException e) {
return ResponseEntity.badRequest().body("Health check failed: " + e.getMessage());
}
}
}
Usage Examples
Application Configuration
# application.yml
elasticsearch:
host: localhost
port: 9200
scheme: http
username: elastic
password: ${ELASTIC_PASSWORD:changeme}
index:
prefix: myapp-logs
shards: 3
replicas: 1
bulk:
batch-size: 1000
flush-interval: 5000
logging:
config: classpath:log4j2.xml
Log4j2 Configuration
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
<ElasticsearchAppender name="Elasticsearch">
<PatternLayout pattern="%msg"/>
</ElasticsearchAppender>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
<AppenderRef ref="Elasticsearch"/>
</Root>
<Logger name="com.elasticsearch" level="debug" additivity="false">
<AppenderRef ref="Console"/>
<AppenderRef ref="Elasticsearch"/>
</Logger>
</Loggers>
</Configuration>
Spring Boot Application
package com.elasticsearch.logging;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableAsync
@EnableScheduling
public class LogIndexingApplication {
public static void main(String[] args) {
SpringApplication.run(LogIndexingApplication.class, args);
}
}
Example Usage in Service
package com.elasticsearch.logging.example;
import com.elasticsearch.logging.model.LogEntry;
import com.elasticsearch.logging.service.ElasticsearchLogService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
@Service
public class BusinessService {
private static final Logger logger = LoggerFactory.getLogger(BusinessService.class);
@Autowired
private ElasticsearchLogService elasticsearchLogService;
public void processOrder(String orderId) {
// Standard logging (will be captured by Log4j2 appender)
logger.info("Processing order: {}", orderId);
try {
// Custom structured logging with additional context
LogEntry logEntry = LogEntry.builder()
.level("INFO")
.logger(getClass().getName())
.message("Started order processing")
.traceId(generateTraceId())
.spanId(generateSpanId())
.application("order-service")
.environment("production")
.addContext("orderId", orderId)
.addContext("customerId", "cust-123")
.addContext("amount", "150.00")
.build();
elasticsearchLogService.indexLog(logEntry);
// Business logic
validateOrder(orderId);
processPayment(orderId);
updateInventory(orderId);
// Success log
LogEntry successLog = LogEntry.builder()
.level("INFO")
.message("Order processed successfully")
.addContext("orderId", orderId)
.addContext("processingTime", "150ms")
.build();
elasticsearchLogService.indexLog(successLog);
} catch (Exception e) {
// Error logging with exception
LogEntry errorLog = LogEntry.builder()
.level("ERROR")
.message("Order processing failed")
.addContext("orderId", orderId)
.exception(e)
.build();
elasticsearchLogService.indexLog(errorLog);
logger.error("Order processing failed for order: {}", orderId, e);
}
}
private void validateOrder(String orderId) {
// Validation logic
logger.debug("Validating order: {}", orderId);
}
private void processPayment(String orderId) {
// Payment processing logic
logger.info("Processing payment for order: {}", orderId);
}
private void updateInventory(String orderId) {
// Inventory update logic
logger.debug("Updating inventory for order: {}", orderId);
}
private String generateTraceId() {
return java.util.UUID.randomUUID().toString();
}
private String generateSpanId() {
return java.util.UUID.randomUUID().toString().substring(0, 8);
}
}
Key Features
- High-Performance Bulk Indexing: Asynchronous batch processing with configurable batch sizes
- Automatic Index Management: Daily index rotation with configurable retention
- Structured Logging: Rich context and metadata support
- Multiple Authentication Methods: Support for username/password and API key authentication
- SSL/TLS Support: Secure communication with Elasticsearch cluster
- Comprehensive Monitoring: Real-time statistics and health checks
- Log4j2 Integration: Seamless integration with existing logging frameworks
- Advanced Search Capabilities: Flexible querying with date ranges and filters
- Error Handling & Retry: Robust error handling with fallback mechanisms
- Spring Boot Integration: Native Spring configuration and dependency injection
This implementation provides a production-ready Elasticsearch logging solution that can handle high-volume log data while providing excellent search performance and operational visibility.