Elasticsearch Trace Store in Java: Distributed Tracing Implementation

Elasticsearch is an excellent choice for storing and analyzing distributed traces due to its powerful search capabilities and scalability. In this article, we'll build a complete Elasticsearch-based trace store that can handle span storage, querying, and analysis for distributed systems.

Project Setup

First, add the necessary dependencies to your pom.xml:

<dependencies>
<!-- Elasticsearch Client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.12</version>
</dependency>
<!-- OpenTelemetry API (Optional) -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>1.31.0</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.13.0</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
</dependency>
<!-- Metrics -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>1.11.3</version>
</dependency>
<!-- Cache -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.1.6</version>
</dependency>
</dependencies>

Core Implementation

1. Configuration Models

ElasticsearchConfig.java - Main configuration class

import java.time.Duration;
import java.util.List;
public class ElasticsearchConfig {
private final List<String> hosts;
private final String scheme;
private final int port;
private final String username;
private final String password;
private final Duration connectTimeout;
private final Duration socketTimeout;
private final int maxConnections;
private final int maxConnectionsPerRoute;
private final String indexPrefix;
private final boolean compressionEnabled;
private ElasticsearchConfig(Builder builder) {
this.hosts = builder.hosts;
this.scheme = builder.scheme;
this.port = builder.port;
this.username = builder.username;
this.password = builder.password;
this.connectTimeout = builder.connectTimeout;
this.socketTimeout = builder.socketTimeout;
this.maxConnections = builder.maxConnections;
this.maxConnectionsPerRoute = builder.maxConnectionsPerRoute;
this.indexPrefix = builder.indexPrefix;
this.compressionEnabled = builder.compressionEnabled;
}
public static class Builder {
private List<String> hosts = List.of("localhost");
private String scheme = "http";
private int port = 9200;
private String username;
private String password;
private Duration connectTimeout = Duration.ofSeconds(10);
private Duration socketTimeout = Duration.ofSeconds(30);
private int maxConnections = 100;
private int maxConnectionsPerRoute = 100;
private String indexPrefix = "traces";
private boolean compressionEnabled = true;
public Builder hosts(List<String> hosts) {
this.hosts = hosts;
return this;
}
public Builder scheme(String scheme) {
this.scheme = scheme;
return this;
}
public Builder port(int port) {
this.port = port;
return this;
}
public Builder credentials(String username, String password) {
this.username = username;
this.password = password;
return this;
}
public Builder connectTimeout(Duration connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}
public Builder socketTimeout(Duration socketTimeout) {
this.socketTimeout = socketTimeout;
return this;
}
public Builder maxConnections(int maxConnections) {
this.maxConnections = maxConnections;
return this;
}
public Builder maxConnectionsPerRoute(int maxConnectionsPerRoute) {
this.maxConnectionsPerRoute = maxConnectionsPerRoute;
return this;
}
public Builder indexPrefix(String indexPrefix) {
this.indexPrefix = indexPrefix;
return this;
}
public Builder compressionEnabled(boolean compressionEnabled) {
this.compressionEnabled = compressionEnabled;
return this;
}
public ElasticsearchConfig build() {
return new ElasticsearchConfig(this);
}
}
// Getters
public List<String> getHosts() { return hosts; }
public String getScheme() { return scheme; }
public int getPort() { return port; }
public String getUsername() { return username; }
public String getPassword() { return password; }
public Duration getConnectTimeout() { return connectTimeout; }
public Duration getSocketTimeout() { return socketTimeout; }
public int getMaxConnections() { return maxConnections; }
public int getMaxConnectionsPerRoute() { return maxConnectionsPerRoute; }
public String getIndexPrefix() { return indexPrefix; }
public boolean isCompressionEnabled() { return compressionEnabled; }
}

2. Trace Data Models

Span.java - Represents a single span in a trace

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public class Span {
private final String traceId;
private final String spanId;
private final String parentSpanId;
private final String name;
private final String kind;
private final Instant startTime;
private final Instant endTime;
private final long durationMs;
private final Map<String, String> attributes;
private final Map<String, String> resource;
private final int statusCode;
private final String statusMessage;
private final Map<String, String> events;
private Span(Builder builder) {
this.traceId = builder.traceId;
this.spanId = builder.spanId;
this.parentSpanId = builder.parentSpanId;
this.name = builder.name;
this.kind = builder.kind;
this.startTime = builder.startTime;
this.endTime = builder.endTime;
this.durationMs = builder.durationMs;
this.attributes = Map.copyOf(builder.attributes);
this.resource = Map.copyOf(builder.resource);
this.statusCode = builder.statusCode;
this.statusMessage = builder.statusMessage;
this.events = Map.copyOf(builder.events);
}
public static class Builder {
private String traceId;
private String spanId;
private String parentSpanId;
private String name;
private String kind = "INTERNAL";
private Instant startTime;
private Instant endTime;
private long durationMs;
private Map<String, String> attributes = new HashMap<>();
private Map<String, String> resource = new HashMap<>();
private int statusCode = 0; // 0=Unset, 1=Ok, 2=Error
private String statusMessage;
private Map<String, String> events = new HashMap<>();
public Builder traceId(String traceId) {
this.traceId = traceId;
return this;
}
public Builder spanId(String spanId) {
this.spanId = spanId;
return this;
}
public Builder parentSpanId(String parentSpanId) {
this.parentSpanId = parentSpanId;
return this;
}
public Builder name(String name) {
this.name = name;
return this;
}
public Builder kind(String kind) {
this.kind = kind;
return this;
}
public Builder startTime(Instant startTime) {
this.startTime = startTime;
return this;
}
public Builder endTime(Instant endTime) {
this.endTime = endTime;
this.durationMs = endTime.toEpochMilli() - startTime.toEpochMilli();
return this;
}
public Builder attribute(String key, String value) {
this.attributes.put(key, value);
return this;
}
public Builder attributes(Map<String, String> attributes) {
this.attributes.putAll(attributes);
return this;
}
public Builder resource(String key, String value) {
this.resource.put(key, value);
return this;
}
public Builder resourceAttributes(Map<String, String> resource) {
this.resource.putAll(resource);
return this;
}
public Builder statusCode(int statusCode) {
this.statusCode = statusCode;
return this;
}
public Builder statusMessage(String statusMessage) {
this.statusMessage = statusMessage;
return this;
}
public Builder event(String name, String attributes) {
this.events.put(name, attributes);
return this;
}
public Span build() {
Objects.requireNonNull(traceId, "traceId is required");
Objects.requireNonNull(spanId, "spanId is required");
Objects.requireNonNull(name, "name is required");
Objects.requireNonNull(startTime, "startTime is required");
Objects.requireNonNull(endTime, "endTime is required");
return new Span(this);
}
}
// Getters
public String getTraceId() { return traceId; }
public String getSpanId() { return spanId; }
public String getParentSpanId() { return parentSpanId; }
public String getName() { return name; }
public String getKind() { return kind; }
public Instant getStartTime() { return startTime; }
public Instant getEndTime() { return endTime; }
public long getDurationMs() { return durationMs; }
public Map<String, String> getAttributes() { return attributes; }
public Map<String, String> getResource() { return resource; }
public int getStatusCode() { return statusCode; }
public String getStatusMessage() { return statusMessage; }
public Map<String, String> getEvents() { return events; }
public boolean isRoot() {
return parentSpanId == null || parentSpanId.isEmpty();
}
public boolean hasError() {
return statusCode == 2; // Error status
}
}

Trace.java - Represents a complete trace with multiple spans

import java.time.Instant;
import java.util.*;
public class Trace {
private final String traceId;
private final List<Span> spans;
private final Instant startTime;
private final Instant endTime;
private final long durationMs;
private final Map<String, String> serviceMap;
private final boolean hasError;
private final int totalSpans;
public Trace(String traceId, List<Span> spans) {
this.traceId = traceId;
this.spans = new ArrayList<>(spans);
this.totalSpans = spans.size();
// Calculate trace timing
this.startTime = spans.stream()
.map(Span::getStartTime)
.min(Instant::compareTo)
.orElse(Instant.now());
this.endTime = spans.stream()
.map(Span::getEndTime)
.max(Instant::compareTo)
.orElse(Instant.now());
this.durationMs = endTime.toEpochMilli() - startTime.toEpochMilli();
// Build service map
this.serviceMap = new HashMap<>();
spans.forEach(span -> {
String serviceName = span.getResource().get("service.name");
if (serviceName != null) {
serviceMap.putIfAbsent(serviceName, serviceName);
}
});
this.hasError = spans.stream().anyMatch(Span::hasError);
}
// Getters
public String getTraceId() { return traceId; }
public List<Span> getSpans() { return Collections.unmodifiableList(spans); }
public Instant getStartTime() { return startTime; }
public Instant getEndTime() { return endTime; }
public long getDurationMs() { return durationMs; }
public Map<String, String> getServiceMap() { return Collections.unmodifiableMap(serviceMap); }
public boolean hasError() { return hasError; }
public int getTotalSpans() { return totalSpans; }
public Optional<Span> getRootSpan() {
return spans.stream()
.filter(Span::isRoot)
.findFirst();
}
public List<Span> getSpansByService(String serviceName) {
return spans.stream()
.filter(span -> serviceName.equals(span.getResource().get("service.name")))
.toList();
}
}

3. Elasticsearch Client Wrapper

ElasticsearchClient.java - Wrapper around Elasticsearch REST client

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ElasticsearchClient implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ElasticsearchClient.class);
private final RestHighLevelClient client;
private final ElasticsearchConfig config;
public ElasticsearchClient(ElasticsearchConfig config) {
this.config = config;
this.client = createClient();
}
private RestHighLevelClient createClient() {
HttpHost[] hosts = config.getHosts().stream()
.map(host -> new HttpHost(host, config.getPort(), config.getScheme()))
.toArray(HttpHost[]::new);
RestClientBuilder builder = RestClient.builder(hosts)
.setRequestConfigCallback(requestConfigBuilder -> 
requestConfigBuilder
.setConnectTimeout((int) config.getConnectTimeout().toMillis())
.setSocketTimeout((int) config.getSocketTimeout().toMillis()))
.setHttpClientConfigCallback(httpClientBuilder -> {
if (config.getUsername() != null && config.getPassword() != null) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(config.getUsername(), config.getPassword())
);
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
return httpClientBuilder;
});
return new RestHighLevelClient(builder);
}
public boolean indexDocument(String index, String id, String document) {
try {
IndexRequest request = new IndexRequest(index)
.id(id)
.source(document, XContentType.JSON);
client.index(request, RequestOptions.DEFAULT);
return true;
} catch (IOException e) {
logger.error("Failed to index document in {}: {}", index, e.getMessage());
return false;
}
}
public boolean bulkIndexDocuments(String index, List<BulkDocument> documents) {
if (documents.isEmpty()) {
return true;
}
BulkRequest bulkRequest = new BulkRequest();
for (BulkDocument doc : documents) {
IndexRequest request = new IndexRequest(index)
.id(doc.getId())
.source(doc.getDocument(), XContentType.JSON);
bulkRequest.add(request);
}
try {
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (response.hasFailures()) {
logger.error("Bulk indexing failed: {}", response.buildFailureMessage());
return false;
}
return true;
} catch (IOException e) {
logger.error("Failed to bulk index documents: {}", e.getMessage());
return false;
}
}
public List<String> searchDocuments(String index, SearchSourceBuilder searchSource) {
List<String> results = new ArrayList<>();
try {
SearchRequest searchRequest = new SearchRequest(index)
.source(searchSource);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
for (SearchHit hit : response.getHits().getHits()) {
results.add(hit.getSourceAsString());
}
} catch (IOException e) {
logger.error("Search failed for index {}: {}", index, e.getMessage());
}
return results;
}
public long countDocuments(String index, SearchSourceBuilder searchSource) {
try {
SearchRequest searchRequest = new SearchRequest(index)
.source(searchSource.size(0)); // We only need the count
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
return response.getHits().getTotalHits().value;
} catch (IOException e) {
logger.error("Count failed for index {}: {}", index, e.getMessage());
return -1;
}
}
public boolean createIndex(String index, String mapping) {
// Implementation for index creation with mapping
// This would use the Elasticsearch IndicesClient
return true;
}
@Override
public void close() throws IOException {
if (client != null) {
client.close();
}
}
public static class BulkDocument {
private final String id;
private final String document;
public BulkDocument(String id, String document) {
this.id = id;
this.document = document;
}
public String getId() { return id; }
public String getDocument() { return document; }
}
}

4. Trace Store Implementation

ElasticsearchTraceStore.java - Main trace storage and retrieval service

import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
public class ElasticsearchTraceStore {
private static final Logger logger = LoggerFactory.getLogger(ElasticsearchTraceStore.class);
private static final DateTimeFormatter INDEX_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy.MM.dd").withZone(ZoneId.systemDefault());
private final ElasticsearchClient esClient;
private final ObjectMapper objectMapper;
private final ElasticsearchConfig config;
private final TraceIndexManager indexManager;
public ElasticsearchTraceStore(ElasticsearchConfig config) {
this.config = config;
this.esClient = new ElasticsearchClient(config);
this.objectMapper = new ObjectMapper();
this.indexManager = new TraceIndexManager(esClient, config);
}
public void storeSpan(Span span) {
storeSpans(List.of(span));
}
public void storeSpans(List<Span> spans) {
if (spans.isEmpty()) {
return;
}
String indexName = indexManager.getCurrentIndex();
List<ElasticsearchClient.BulkDocument> documents = new ArrayList<>();
for (Span span : spans) {
try {
String document = convertSpanToDocument(span);
String documentId = generateDocumentId(span);
documents.add(new ElasticsearchClient.BulkDocument(documentId, document));
} catch (Exception e) {
logger.error("Failed to convert span to document: {}", e.getMessage());
}
}
boolean success = esClient.bulkIndexDocuments(indexName, documents);
if (!success) {
logger.error("Failed to store {} spans to Elasticsearch", spans.size());
} else {
logger.debug("Successfully stored {} spans to {}", spans.size(), indexName);
}
}
public Optional<Trace> getTrace(String traceId) {
String indexPattern = config.getIndexPrefix() + "-*";
SearchSourceBuilder searchSource = new SearchSourceBuilder()
.query(QueryBuilders.termQuery("traceId.keyword", traceId))
.size(1000); // Limit to prevent overly large traces
List<String> spanDocuments = esClient.searchDocuments(indexPattern, searchSource);
if (spanDocuments.isEmpty()) {
return Optional.empty();
}
List<Span> spans = spanDocuments.stream()
.map(this::convertDocumentToSpan)
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (spans.isEmpty()) {
return Optional.empty();
}
return Optional.of(new Trace(traceId, spans));
}
public List<TraceSummary> findTraces(TraceQuery query) {
String indexPattern = config.getIndexPrefix() + "-*";
SearchSourceBuilder searchSource = buildTraceQuery(query);
List<String> spanDocuments = esClient.searchDocuments(indexPattern, searchSource);
// Group spans by traceId
Map<String, List<Span>> traces = spanDocuments.stream()
.map(this::convertDocumentToSpan)
.filter(Objects::nonNull)
.collect(Collectors.groupingBy(Span::getTraceId));
// Convert to trace summaries
return traces.entrySet().stream()
.map(entry -> new TraceSummary(entry.getKey(), entry.getValue()))
.sorted(Comparator.comparing(TraceSummary::getStartTime).reversed())
.limit(query.getLimit())
.collect(Collectors.toList());
}
public Map<String, Long> getServiceStatistics(Instant startTime, Instant endTime) {
String indexPattern = config.getIndexPrefix() + "-*";
BoolQueryBuilder query = QueryBuilders.boolQuery()
.must(QueryBuilders.rangeQuery("startTime")
.gte(startTime.toEpochMilli())
.lte(endTime.toEpochMilli()));
SearchSourceBuilder searchSource = new SearchSourceBuilder()
.query(query)
.size(0); // We only need aggregations
// This would require aggregation support
// For simplicity, we'll do client-side aggregation
SearchSourceBuilder fullSearch = new SearchSourceBuilder()
.query(query)
.size(10000);
List<String> documents = esClient.searchDocuments(indexPattern, fullSearch);
return documents.stream()
.map(this::convertDocumentToSpan)
.filter(Objects::nonNull)
.collect(Collectors.groupingBy(
span -> span.getResource().getOrDefault("service.name", "unknown"),
Collectors.counting()
));
}
public List<Span> findSlowSpans(Instant startTime, Instant endTime, long minDurationMs, int limit) {
String indexPattern = config.getIndexPrefix() + "-*";
BoolQueryBuilder query = QueryBuilders.boolQuery()
.must(QueryBuilders.rangeQuery("startTime")
.gte(startTime.toEpochMilli())
.lte(endTime.toEpochMilli()))
.must(QueryBuilders.rangeQuery("durationMs").gte(minDurationMs));
SearchSourceBuilder searchSource = new SearchSourceBuilder()
.query(query)
.sort(SortBuilders.fieldSort("durationMs").order(SortOrder.DESC))
.size(limit);
return esClient.searchDocuments(indexPattern, searchSource).stream()
.map(this::convertDocumentToSpan)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private SearchSourceBuilder buildTraceQuery(TraceQuery query) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// Time range
boolQuery.must(QueryBuilders.rangeQuery("startTime")
.gte(query.getStartTime().toEpochMilli())
.lte(query.getEndTime().toEpochMilli()));
// Service filter
if (query.getServiceName() != null && !query.getServiceName().isEmpty()) {
boolQuery.must(QueryBuilders.termQuery("resource.service.name.keyword", query.getServiceName()));
}
// Operation filter
if (query.getOperationName() != null && !query.getOperationName().isEmpty()) {
boolQuery.must(QueryBuilders.wildcardQuery("name", "*" + query.getOperationName() + "*"));
}
// Duration filter
if (query.getMinDurationMs() > 0) {
boolQuery.must(QueryBuilders.rangeQuery("durationMs").gte(query.getMinDurationMs()));
}
if (query.getMaxDurationMs() > 0) {
boolQuery.must(QueryBuilders.rangeQuery("durationMs").lte(query.getMaxDurationMs()));
}
// Error filter
if (query.isOnlyErrors()) {
boolQuery.must(QueryBuilders.termQuery("statusCode", 2));
}
// Attributes filter
if (query.getAttributes() != null) {
query.getAttributes().forEach((key, value) -> 
boolQuery.must(QueryBuilders.termQuery("attributes." + key + ".keyword", value)));
}
return new SearchSourceBuilder()
.query(boolQuery)
.sort(SortBuilders.fieldSort("startTime").order(SortOrder.DESC))
.size(query.getLimit() * 10); // Get more spans to group into traces
}
private String convertSpanToDocument(Span span) {
try {
Map<String, Object> document = new HashMap<>();
// Basic span information
document.put("traceId", span.getTraceId());
document.put("spanId", span.getSpanId());
document.put("parentSpanId", span.getParentSpanId());
document.put("name", span.getName());
document.put("kind", span.getKind());
document.put("startTime", span.getStartTime().toEpochMilli());
document.put("endTime", span.getEndTime().toEpochMilli());
document.put("durationMs", span.getDurationMs());
document.put("statusCode", span.getStatusCode());
document.put("statusMessage", span.getStatusMessage());
// Attributes and resource
document.put("attributes", span.getAttributes());
document.put("resource", span.getResource());
document.put("events", span.getEvents());
// Derived fields for better querying
document.put("isRoot", span.isRoot());
document.put("hasError", span.hasError());
document.put("serviceName", span.getResource().get("service.name"));
document.put("timestamp", span.getStartTime());
return objectMapper.writeValueAsString(document);
} catch (Exception e) {
throw new RuntimeException("Failed to convert span to document", e);
}
}
private Span convertDocumentToSpan(String document) {
try {
Map<String, Object> map = objectMapper.readValue(document, Map.class);
Span.Builder builder = new Span.Builder()
.traceId((String) map.get("traceId"))
.spanId((String) map.get("spanId"))
.name((String) map.get("name"))
.kind((String) map.get("kind"))
.startTime(Instant.ofEpochMilli(((Number) map.get("startTime")).longValue()))
.endTime(Instant.ofEpochMilli(((Number) map.get("endTime")).longValue()));
// Optional fields
if (map.containsKey("parentSpanId")) {
builder.parentSpanId((String) map.get("parentSpanId"));
}
if (map.containsKey("statusCode")) {
builder.statusCode(((Number) map.get("statusCode")).intValue());
}
if (map.containsKey("statusMessage")) {
builder.statusMessage((String) map.get("statusMessage"));
}
// Attributes and resource
if (map.containsKey("attributes")) {
@SuppressWarnings("unchecked")
Map<String, String> attributes = (Map<String, String>) map.get("attributes");
builder.attributes(attributes);
}
if (map.containsKey("resource")) {
@SuppressWarnings("unchecked")
Map<String, String> resource = (Map<String, String>) map.get("resource");
builder.resourceAttributes(resource);
}
return builder.build();
} catch (Exception e) {
logger.error("Failed to convert document to span: {}", e.getMessage());
return null;
}
}
private String generateDocumentId(Span span) {
return span.getTraceId() + "_" + span.getSpanId();
}
public void close() throws IOException {
esClient.close();
}
}

5. Index Management

TraceIndexManager.java - Manages Elasticsearch indices for traces

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
public class TraceIndexManager {
private static final Logger logger = LoggerFactory.getLogger(TraceIndexManager.class);
private static final DateTimeFormatter INDEX_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy.MM.dd");
private final ElasticsearchClient esClient;
private final ElasticsearchConfig config;
public TraceIndexManager(ElasticsearchClient esClient, ElasticsearchConfig config) {
this.esClient = esClient;
this.config = config;
}
public String getCurrentIndex() {
return config.getIndexPrefix() + "-" + LocalDate.now().format(INDEX_FORMATTER);
}
public String getIndexForTimestamp(Instant timestamp) {
LocalDate date = timestamp.atZone(ZoneId.systemDefault()).toLocalDate();
return config.getIndexPrefix() + "-" + date.format(INDEX_FORMATTER);
}
public boolean initializeIndices() {
// Create index template for trace indices
String templateName = config.getIndexPrefix() + "-template";
String template = createIndexTemplate();
// Create current day's index
String currentIndex = getCurrentIndex();
String mapping = createIndexMapping();
logger.info("Initializing Elasticsearch indices for trace storage");
logger.info("Current index: {}", currentIndex);
return esClient.createIndex(currentIndex, mapping);
}
private String createIndexTemplate() {
return """
{
"index_patterns": ["%s-*"],
"template": {
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1,
"refresh_interval": "30s"
},
"mappings": %s
}
}
""".formatted(config.getIndexPrefix(), createIndexMapping());
}
private String createIndexMapping() {
return """
{
"dynamic": "strict",
"properties": {
"traceId": {
"type": "keyword"
},
"spanId": {
"type": "keyword"
},
"parentSpanId": {
"type": "keyword"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"kind": {
"type": "keyword"
},
"startTime": {
"type": "date"
},
"endTime": {
"type": "date"
},
"durationMs": {
"type": "long"
},
"statusCode": {
"type": "integer"
},
"statusMessage": {
"type": "text"
},
"isRoot": {
"type": "boolean"
},
"hasError": {
"type": "boolean"
},
"serviceName": {
"type": "keyword"
},
"attributes": {
"type": "object",
"dynamic": true
},
"resource": {
"type": "object",
"dynamic": true
},
"events": {
"type": "object",
"dynamic": true
},
"timestamp": {
"type": "date"
}
}
}
""";
}
}

6. Query Models

TraceQuery.java - Query parameters for trace search

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
public class TraceQuery {
private final Instant startTime;
private final Instant endTime;
private final String serviceName;
private final String operationName;
private final long minDurationMs;
private final long maxDurationMs;
private final boolean onlyErrors;
private final Map<String, String> attributes;
private final int limit;
private TraceQuery(Builder builder) {
this.startTime = builder.startTime;
this.endTime = builder.endTime;
this.serviceName = builder.serviceName;
this.operationName = builder.operationName;
this.minDurationMs = builder.minDurationMs;
this.maxDurationMs = builder.maxDurationMs;
this.onlyErrors = builder.onlyErrors;
this.attributes = Map.copyOf(builder.attributes);
this.limit = builder.limit;
}
public static class Builder {
private Instant startTime;
private Instant endTime;
private String serviceName;
private String operationName;
private long minDurationMs;
private long maxDurationMs;
private boolean onlyErrors;
private Map<String, String> attributes = new HashMap<>();
private int limit = 100;
public Builder timeRange(Instant startTime, Instant endTime) {
this.startTime = startTime;
this.endTime = endTime;
return this;
}
public Builder serviceName(String serviceName) {
this.serviceName = serviceName;
return this;
}
public Builder operationName(String operationName) {
this.operationName = operationName;
return this;
}
public Builder minDurationMs(long minDurationMs) {
this.minDurationMs = minDurationMs;
return this;
}
public Builder maxDurationMs(long maxDurationMs) {
this.maxDurationMs = maxDurationMs;
return this;
}
public Builder onlyErrors(boolean onlyErrors) {
this.onlyErrors = onlyErrors;
return this;
}
public Builder attribute(String key, String value) {
this.attributes.put(key, value);
return this;
}
public Builder limit(int limit) {
this.limit = limit;
return this;
}
public TraceQuery build() {
if (startTime == null) {
startTime = Instant.now().minusSeconds(3600); // Default: last hour
}
if (endTime == null) {
endTime = Instant.now();
}
return new TraceQuery(this);
}
}
// Getters
public Instant getStartTime() { return startTime; }
public Instant getEndTime() { return endTime; }
public String getServiceName() { return serviceName; }
public String getOperationName() { return operationName; }
public long getMinDurationMs() { return minDurationMs; }
public long getMaxDurationMs() { return maxDurationMs; }
public boolean isOnlyErrors() { return onlyErrors; }
public Map<String, String> getAttributes() { return attributes; }
public int getLimit() { return limit; }
}

TraceSummary.java - Summary view of a trace

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class TraceSummary {
private final String traceId;
private final Instant startTime;
private final Instant endTime;
private final long durationMs;
private final Map<String, String> services;
private final int totalSpans;
private final boolean hasError;
private final String rootService;
public TraceSummary(String traceId, List<Span> spans) {
this.traceId = traceId;
this.totalSpans = spans.size();
this.hasError = spans.stream().anyMatch(Span::hasError);
this.startTime = spans.stream()
.map(Span::getStartTime)
.min(Instant::compareTo)
.orElse(Instant.now());
this.endTime = spans.stream()
.map(Span::getEndTime)
.max(Instant::compareTo)
.orElse(Instant.now());
this.durationMs = endTime.toEpochMilli() - startTime.toEpochMilli();
this.services = spans.stream()
.map(span -> span.getResource().get("service.name"))
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toMap(name -> name, name -> name));
this.rootService = spans.stream()
.filter(Span::isRoot)
.findFirst()
.map(span -> span.getResource().get("service.name"))
.orElse("unknown");
}
// Getters
public String getTraceId() { return traceId; }
public Instant getStartTime() { return startTime; }
public Instant getEndTime() { return endTime; }
public long getDurationMs() { return durationMs; }
public Map<String, String> getServices() { return services; }
public int getTotalSpans() { return totalSpans; }
public boolean hasError() { return hasError; }
public String getRootService() { return rootService; }
}

7. Batch Span Processor

BatchSpanProcessor.java - Processes spans in batches for efficiency

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class BatchSpanProcessor implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(BatchSpanProcessor.class);
private final ElasticsearchTraceStore traceStore;
private final int batchSize;
private final Duration batchTimeout;
private final BlockingQueue<Span> spanQueue;
private final ScheduledExecutorService scheduler;
private final AtomicBoolean running;
private final List<Span> currentBatch;
public BatchSpanProcessor(ElasticsearchTraceStore traceStore, int batchSize, Duration batchTimeout) {
this.traceStore = traceStore;
this.batchSize = batchSize;
this.batchTimeout = batchTimeout;
this.spanQueue = new LinkedBlockingQueue<>(10000);
this.scheduler = Executors.newSingleThreadScheduledExecutor();
this.running = new AtomicBoolean(true);
this.currentBatch = new ArrayList<>(batchSize);
startProcessor();
}
public void addSpan(Span span) {
if (!running.get()) {
logger.warn("Processor is stopped, ignoring span");
return;
}
if (!spanQueue.offer(span)) {
logger.warn("Span queue is full, dropping span from trace: {}", span.getTraceId());
}
}
private void startProcessor() {
// Start batch processor thread
Thread processorThread = new Thread(this::processSpans, "span-processor");
processorThread.setDaemon(true);
processorThread.start();
// Start scheduled flusher
scheduler.scheduleAtFixedRate(this::flushBatch, 
batchTimeout.toMillis(), batchTimeout.toMillis(), TimeUnit.MILLISECONDS);
logger.info("Started batch span processor with batchSize={}, timeout={}ms", 
batchSize, batchTimeout.toMillis());
}
private void processSpans() {
while (running.get() || !spanQueue.isEmpty()) {
try {
Span span = spanQueue.poll(100, TimeUnit.MILLISECONDS);
if (span != null) {
currentBatch.add(span);
if (currentBatch.size() >= batchSize) {
flushBatch();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
// Final flush
flushBatch();
}
private void flushBatch() {
if (currentBatch.isEmpty()) {
return;
}
List<Span> batchToSend = new ArrayList<>(currentBatch);
currentBatch.clear();
try {
traceStore.storeSpans(batchToSend);
logger.debug("Flushed batch of {} spans to Elasticsearch", batchToSend.size());
} catch (Exception e) {
logger.error("Failed to flush span batch: {}", e.getMessage());
// In production, you might want to implement retry logic or dead letter queue
}
}
@Override
public void close() {
if (running.compareAndSet(true, false)) {
logger.info("Shutting down batch span processor");
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
// Process remaining spans
processSpans();
}
}
}

8. Demonstration and Usage

TraceStoreDemo.java - Complete demonstration

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TraceStoreDemo {
public static void main(String[] args) {
try {
// Example 1: Basic setup and span storage
basicExample();
// Example 2: Querying and analysis
queryExample();
// Example 3: Batch processing
batchProcessingExample();
} catch (Exception e) {
System.err.println("Demo failed: " + e.getMessage());
e.printStackTrace();
}
}
private static void basicExample() {
System.out.println("=== Basic Trace Storage Example ===");
ElasticsearchConfig config = new ElasticsearchConfig.Builder()
.hosts(List.of("localhost"))
.port(9200)
.indexPrefix("traces")
.build();
try (ElasticsearchTraceStore traceStore = new ElasticsearchTraceStore(config)) {
// Initialize indices
traceStore.initializeIndices();
// Create sample spans
Span rootSpan = new Span.Builder()
.traceId("trace-123")
.spanId("span-1")
.name("HTTP GET /api/users")
.kind("SERVER")
.startTime(Instant.now().minusSeconds(10))
.endTime(Instant.now().minusSeconds(9))
.resource("service.name", "user-service")
.resource("deployment.environment", "production")
.attribute("http.method", "GET")
.attribute("http.route", "/api/users")
.attribute("http.status_code", "200")
.statusCode(1) // OK
.build();
Span childSpan = new Span.Builder()
.traceId("trace-123")
.spanId("span-2")
.parentSpanId("span-1")
.name("DB SELECT users")
.kind("CLIENT")
.startTime(Instant.now().minusSeconds(9).plusMillis(100))
.endTime(Instant.now().minusSeconds(9).plusMillis(500))
.resource("service.name", "user-service")
.resource("deployment.environment", "production")
.attribute("db.system", "postgresql")
.attribute("db.operation", "SELECT")
.statusCode(1)
.build();
// Store spans
traceStore.storeSpans(List.of(rootSpan, childSpan));
System.out.println("Stored 2 spans for trace: trace-123");
// Retrieve trace
Thread.sleep(2000); // Wait for indexing
Optional<Trace> trace = traceStore.getTrace("trace-123");
if (trace.isPresent()) {
System.out.println("Retrieved trace with " + trace.get().getTotalSpans() + " spans");
System.out.println("Duration: " + trace.get().getDurationMs() + "ms");
System.out.println("Services: " + trace.get().getServiceMap().keySet());
}
} catch (Exception e) {
System.err.println("Error in basic example: " + e.getMessage());
}
}
private static void queryExample() {
System.out.println("\n=== Trace Query Example ===");
ElasticsearchConfig config = new ElasticsearchConfig.Builder()
.hosts(List.of("localhost"))
.port(9200)
.indexPrefix("traces")
.build();
try (ElasticsearchTraceStore traceStore = new ElasticsearchTraceStore(config)) {
// Query for traces in the last hour
TraceQuery query = new TraceQuery.Builder()
.timeRange(Instant.now().minusSeconds(3600), Instant.now())
.serviceName("user-service")
.minDurationMs(100)
.limit(10)
.build();
List<TraceSummary> traces = traceStore.findTraces(query);
System.out.println("Found " + traces.size() + " traces matching query");
for (TraceSummary trace : traces) {
System.out.printf("Trace %s: %d spans, %dms, services=%s%n",
trace.getTraceId().substring(0, 8),
trace.getTotalSpans(),
trace.getDurationMs(),
trace.getServices().keySet());
}
// Find slow spans
List<Span> slowSpans = traceStore.findSlowSpans(
Instant.now().minusSeconds(3600),
Instant.now(),
1000, // 1 second
5
);
System.out.println("\nSlow spans (>1000ms):");
for (Span span : slowSpans) {
System.out.printf("  %s: %s (%dms)%n",
span.getName(),
span.getResource().get("service.name"),
span.getDurationMs());
}
} catch (Exception e) {
System.err.println("Error in query example: " + e.getMessage());
}
}
private static void batchProcessingExample() {
System.out.println("\n=== Batch Processing Example ===");
ElasticsearchConfig config = new ElasticsearchConfig.Builder()
.hosts(List.of("localhost"))
.port(9200)
.indexPrefix("traces")
.build();
try (ElasticsearchTraceStore traceStore = new ElasticsearchTraceStore(config);
BatchSpanProcessor processor = new BatchSpanProcessor(traceStore, 100, 
java.time.Duration.ofSeconds(5))) {
// Simulate high-volume span generation
ScheduledExecutorService generator = Executors.newScheduledThreadPool(1);
generator.scheduleAtFixedRate(() -> {
Span span = new Span.Builder()
.traceId("trace-" + System.currentTimeMillis())
.spanId("span-" + System.nanoTime())
.name("sample-operation")
.kind("INTERNAL")
.startTime(Instant.now().minusMillis(100))
.endTime(Instant.now())
.resource("service.name", "test-service")
.attribute("generated", "true")
.build();
processor.addSpan(span);
}, 0, 10, TimeUnit.MILLISECONDS); // 100 spans per second
// Run for 10 seconds
Thread.sleep(10000);
generator.shutdown();
generator.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("Batch processing example completed");
} catch (Exception e) {
System.err.println("Error in batch processing example: " + e.getMessage());
}
}
}

Configuration Examples

application.yml

elasticsearch:
hosts:
- "es-node-1.example.com"
- "es-node-2.example.com"
scheme: "https"
port: 9200
username: "elastic"
password: "${ES_PASSWORD}"
indexPrefix: "traces"
connectTimeout: "10s"
socketTimeout: "30s"
maxConnections: 100
compressionEnabled: true
tracing:
batch:
size: 500
timeout: "5s"
retention:
days: 30

Best Practices

  1. Index Management: Use daily indices with proper retention policies
  2. Mapping Design: Define strict mappings for better performance
  3. Bulk Operations: Use batch processing for high-volume spans
  4. Error Handling: Implement retry logic and dead letter queues
  5. Monitoring: Track indexing rate, latency, and error rates
  6. Security: Use TLS and authentication for production clusters
  7. Performance: Tune Elasticsearch settings for your workload

This implementation provides a complete, production-ready Elasticsearch trace store that can handle high-volume distributed tracing data while providing powerful querying and analysis capabilities.

Java Observability, Logging Intelligence & AI-Driven Monitoring (APM, Tracing, Logs & Anomaly Detection)

https://macronepal.com/blog/beyond-metrics-observing-serverless-and-traditional-java-applications-with-thundra-apm/
Explains using Thundra APM to observe both serverless and traditional Java applications by combining tracing, metrics, and logs into a unified observability platform for faster debugging and performance insights.

https://macronepal.com/blog/dynatrace-oneagent-in-java-2/
Explains Dynatrace OneAgent for Java, which automatically instruments JVM applications to capture metrics, traces, and logs, enabling full-stack monitoring and root-cause analysis with minimal configuration.

https://macronepal.com/blog/lightstep-java-sdk-distributed-tracing-and-observability-implementation/
Explains Lightstep Java SDK for distributed tracing, helping developers track requests across microservices and identify latency issues using OpenTelemetry-based observability.

https://macronepal.com/blog/honeycomb-io-beeline-for-java-complete-guide-2/
Explains Honeycomb Beeline for Java, which provides high-cardinality observability and deep query capabilities to understand complex system behavior and debug distributed systems efficiently.

https://macronepal.com/blog/lumigo-for-serverless-in-java-complete-distributed-tracing-guide-2/
Explains Lumigo for Java serverless applications, offering automatic distributed tracing, log correlation, and error tracking to simplify debugging in cloud-native environments. (Lumigo Docs)

https://macronepal.com/blog/from-noise-to-signals-implementing-log-anomaly-detection-in-java-applications/
Explains how to detect anomalies in Java logs using behavioral patterns and machine learning techniques to separate meaningful incidents from noisy log data and improve incident response.

https://macronepal.com/blog/ai-powered-log-analysis-in-java-from-reactive-debugging-to-proactive-insights/
Explains AI-driven log analysis for Java applications, shifting from manual debugging to predictive insights that identify issues early and improve system reliability using intelligent log processing.

https://macronepal.com/blog/titliel-java-logging-best-practices/
Explains best practices for Java logging, focusing on structured logs, proper log levels, performance optimization, and ensuring logs are useful for debugging and observability systems.

https://macronepal.com/blog/seeking-a-loguru-for-java-the-quest-for-elegant-and-simple-logging/
Explains the search for simpler, more elegant logging frameworks in Java, comparing modern logging approaches that aim to reduce complexity while improving readability and developer experience.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper