Jaeger gRPC Storage Integration in Java

Jaeger is a distributed tracing system that helps monitor and troubleshoot microservices-based distributed systems. This guide demonstrates how to integrate Jaeger's gRPC storage plugin with Java applications for storing and querying trace data.

Why Use Jaeger gRPC Storage?

  • Unified Tracing: Collect traces across multiple services
  • gRPC Performance: High-performance binary protocol for storage operations
  • Flexible Backends: Support for various storage backends (Cassandra, Elasticsearch, Kafka)
  • Rich Querying: Complex trace querying and analysis capabilities
  • Microservices Ready: Designed for distributed systems

Prerequisites

  • Jaeger with gRPC storage plugin enabled
  • Java 11+ with gRPC capabilities
  • Maven/Gradle for dependency management

Step 1: Project Dependencies

Maven (pom.xml):

<dependencies>
<!-- gRPC Dependencies -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>1.53.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.53.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.53.0</version>
</dependency>
<!-- Jaeger Proto Dependencies -->
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-proto</artifactId>
<version>1.8.1</version>
</dependency>
<!-- Protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.22.2</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.22.2</version>
</dependency>
<!-- HTTP Client for REST fallback -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Spring Boot (Optional) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.0</version>
</dependency>
<!-- Cache -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
<version>2.7.0</version>
</dependency>
<!-- Micrometer for metrics -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>1.10.5</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.22.2:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.53.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Step 2: Configuration Classes

@Configuration
@ConfigurationProperties(prefix = "jaeger.storage")
@Data
public class JaegerStorageConfig {
// gRPC Storage Plugin Configuration
private String grpcHost = "localhost";
private int grpcPort = 17271;
private boolean grpcTlsEnabled = false;
private String grpcTlsCertPath;
// Connection Settings
private long timeoutMs = 30000;
private int maxRetries = 3;
private int maxConcurrentCalls = 100;
private boolean keepAlive = true;
private long keepAliveTimeMs = 30000;
// Query Settings
private int maxTracesPerQuery = 100;
private Duration traceLookback = Duration.ofHours(24);
private boolean enableTraceCache = true;
private Duration traceCacheTtl = Duration.ofMinutes(10);
// Storage Backend Settings
private StorageBackend backend = StorageBackend.MEMORY;
private String cassandraContactPoints;
private String elasticsearchUrl;
private String kafkaBootstrapServers;
// Monitoring
private boolean enableMetrics = true;
private Duration metricsCollectionInterval = Duration.ofSeconds(30);
public enum StorageBackend {
MEMORY, CASSANDRA, ELASTICSEARCH, KAFKA, GRPC_PLUGIN
}
public String getGrpcEndpoint() {
return grpcHost + ":" + grpcPort;
}
}
@Component
@Slf4j
public class JaegerGrpcChannelFactory {
private final JaegerStorageConfig config;
private final Map<String, ManagedChannel> channels = new ConcurrentHashMap<>();
public JaegerGrpcChannelFactory(JaegerStorageConfig config) {
this.config = config;
}
public ManagedChannel createChannel() {
return createChannel(config.getGrpcEndpoint());
}
public ManagedChannel createChannel(String endpoint) {
return channels.computeIfAbsent(endpoint, this::buildChannel);
}
private ManagedChannel buildChannel(String endpoint) {
NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(endpoint);
if (config.isGrpcTlsEnabled()) {
try {
channelBuilder.useTransportSecurity();
if (config.getGrpcTlsCertPath() != null) {
// Load custom TLS certificate
channelBuilder.sslContext(buildSslContext());
}
} catch (Exception e) {
log.warn("Failed to configure TLS, falling back to plaintext", e);
channelBuilder.usePlaintext();
}
} else {
channelBuilder.usePlaintext();
}
// Configure connection settings
channelBuilder
.keepAliveWithoutCalls(config.isKeepAlive())
.keepAliveTime(config.getKeepAliveTimeMs(), TimeUnit.MILLISECONDS)
.maxRetryAttempts(config.getMaxRetries())
.enableRetry();
ManagedChannel channel = channelBuilder.build();
log.info("Created gRPC channel to Jaeger storage: {}", endpoint);
return channel;
}
private SslContext buildSslContext() throws Exception {
// Implementation for building SSL context with custom certificates
return GrpcSslContexts.forClient().build();
}
@PreDestroy
public void shutdown() {
channels.values().forEach(ManagedChannel::shutdown);
log.info("Shutdown all Jaeger gRPC channels");
}
}

Step 3: Core Data Models

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TraceQuery {
private String serviceName;
private String operationName;
private Map<String, String> tags;
private Instant startTimeMin;
private Instant startTimeMax;
private Duration durationMin;
private Duration durationMax;
private int searchDepth = 20;
private int limit = 100;
public boolean hasTimeRange() {
return startTimeMin != null && startTimeMax != null;
}
public boolean hasDurationFilter() {
return durationMin != null || durationMax != null;
}
public Map<String, String> getTags() {
return tags != null ? tags : Collections.emptyMap();
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Trace {
private String traceId;
private List<Span> spans;
private Map<String, String> processes;
private Instant startTime;
private Duration duration;
private Map<String, Object> services;
private Map<String, Object> operations;
public Optional<Span> getRootSpan() {
return spans.stream()
.filter(span -> span.getParentSpanId() == null || span.getParentSpanId().isEmpty())
.findFirst();
}
public List<Span> getSpansByService(String serviceName) {
return spans.stream()
.filter(span -> serviceName.equals(span.getProcess().getServiceName()))
.collect(Collectors.toList());
}
public Map<String, Long> getServiceDurations() {
return spans.stream()
.collect(Collectors.groupingBy(
span -> span.getProcess().getServiceName(),
Collectors.summingLong(Span::getDuration)
));
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Span {
private String traceId;
private String spanId;
private String parentSpanId;
private String operationName;
private SpanReference[] references;
private long startTime; // microseconds
private long duration; // microseconds
private Map<String, String> tags;
private SpanProcess process;
private SpanLog[] logs;
public Instant getStartTimeInstant() {
return Instant.ofEpochMilli(startTime / 1000);
}
public Duration getDurationAsDuration() {
return Duration.ofMicros(duration);
}
public boolean hasError() {
return tags != null && tags.entrySet().stream()
.anyMatch(entry -> "error".equals(entry.getKey()) && "true".equals(entry.getValue()));
}
public Optional<String> getTag(String key) {
return tags != null ? Optional.ofNullable(tags.get(key)) : Optional.empty();
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SpanProcess {
private String serviceName;
private Map<String, String> tags;
public Optional<String> getTag(String key) {
return tags != null ? Optional.ofNullable(tags.get(key)) : Optional.empty();
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SpanLog {
private long timestamp; // microseconds
private Map<String, String> fields;
public Instant getTimestampInstant() {
return Instant.ofEpochMilli(timestamp / 1000);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SpanReference {
private String traceId;
private String spanId;
private ReferenceType refType;
public enum ReferenceType {
CHILD_OF, FOLLOWS_FROM
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TraceSearchResult {
private List<Trace> traces;
private int total;
private int limit;
private int offset;
private Map<String, Object> errors;
public static TraceSearchResult empty() {
return TraceSearchResult.builder()
.traces(Collections.emptyList())
.total(0)
.limit(0)
.offset(0)
.build();
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DependencyLink {
private String parent;
private String child;
private long callCount;
private String source;
private Map<String, Object> stats;
public String getKey() {
return parent + "->" + child;
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ServiceOperation {
private String service;
private String operation;
private long spanCount;
private double avgDuration;
private double errorRate;
}

Step 4: gRPC Service Clients

Span Writer Client
@Service
@Slf4j
public class JaegerSpanWriter {
private final JaegerStorageConfig config;
private final JaegerGrpcChannelFactory channelFactory;
private SpanWriterPluginGrpc.SpanWriterPluginBlockingStub blockingStub;
private SpanWriterPluginGrpc.SpanWriterPluginFutureStub futureStub;
public JaegerSpanWriter(JaegerStorageConfig config, JaegerGrpcChannelFactory channelFactory) {
this.config = config;
this.channelFactory = channelFactory;
initializeStubs();
}
public WriteSpansResponse writeSpans(List<Span> spans) {
if (spans == null || spans.isEmpty()) {
return WriteSpansResponse.newBuilder().build();
}
try {
WriteSpansRequest request = buildWriteSpansRequest(spans);
return blockingStub
.withDeadlineAfter(config.getTimeoutMs(), TimeUnit.MILLISECONDS)
.writeSpans(request);
} catch (Exception e) {
log.error("Failed to write {} spans to Jaeger storage", spans.size(), e);
throw new JaegerStorageException("Span write operation failed", e);
}
}
public CompletableFuture<WriteSpansResponse> writeSpansAsync(List<Span> spans) {
if (spans == null || spans.isEmpty()) {
return CompletableFuture.completedFuture(WriteSpansResponse.newBuilder().build());
}
try {
WriteSpansRequest request = buildWriteSpansRequest(spans);
ListenableFuture<WriteSpansResponse> future = futureStub
.withDeadlineAfter(config.getTimeoutMs(), TimeUnit.MILLISECONDS)
.writeSpans(request);
return convertToCompletableFuture(future);
} catch (Exception e) {
log.error("Failed to write {} spans asynchronously", spans.size(), e);
CompletableFuture<WriteSpansResponse> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new JaegerStorageException("Async span write failed", e));
return failedFuture;
}
}
public void writeSpansWithRetry(List<Span> spans, int maxRetries) {
int attempt = 0;
while (attempt < maxRetries) {
try {
writeSpans(spans);
return;
} catch (Exception e) {
attempt++;
if (attempt >= maxRetries) {
throw e;
}
log.warn("Write attempt {} failed, retrying...", attempt);
exponentialBackoff(attempt);
}
}
}
private WriteSpansRequest buildWriteSpansRequest(List<Span> spans) {
Batch batch = Batch.newBuilder()
.addAllSpans(spans.stream()
.map(this::convertToProtoSpan)
.collect(Collectors.toList()))
.setProcess(convertToProtoProcess(spans.get(0).getProcess()))
.build();
return WriteSpansRequest.newBuilder()
.setBatch(batch)
.build();
}
private io.jaegertracing.api_v2.Span convertToProtoSpan(Span span) {
io.jaegertracing.api_v2.Span.Builder builder = io.jaegertracing.api_v2.Span.newBuilder()
.setTraceId(ByteString.copyFrom(hexToBytes(span.getTraceId())))
.setSpanId(ByteString.copyFrom(hexToBytes(span.getSpanId())))
.setOperationName(span.getOperationName())
.setStartTime(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(span.getStartTime() / 1_000_000)
.setNanos((int) ((span.getStartTime() % 1_000_000) * 1_000))
.build())
.setDuration(com.google.protobuf.Duration.newBuilder()
.setSeconds(span.getDuration() / 1_000_000)
.setNanos((int) ((span.getDuration() % 1_000_000) * 1_000))
.build());
// Add parent span ID if present
if (span.getParentSpanId() != null && !span.getParentSpanId().isEmpty()) {
builder.setParentSpanId(ByteString.copyFrom(hexToBytes(span.getParentSpanId())));
}
// Add tags
if (span.getTags() != null) {
span.getTags().forEach((key, value) -> 
builder.addTags(KeyValue.newBuilder()
.setKey(key)
.setVStr(value)
.setVType(ValueType.STRING)
.build())
);
}
// Add process
if (span.getProcess() != null) {
builder.setProcess(convertToProtoProcess(span.getProcess()));
}
return builder.build();
}
private Process convertToProtoProcess(SpanProcess process) {
Process.Builder builder = Process.newBuilder()
.setServiceName(process.getServiceName());
if (process.getTags() != null) {
process.getTags().forEach((key, value) ->
builder.addTags(KeyValue.newBuilder()
.setKey(key)
.setVStr(value)
.setVType(ValueType.STRING)
.build())
);
}
return builder.build();
}
private byte[] hexToBytes(String hex) {
int len = hex.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4)
+ Character.digit(hex.charAt(i + 1), 16));
}
return data;
}
private <T> CompletableFuture<T> convertToCompletableFuture(ListenableFuture<T> listenableFuture) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
Futures.addCallback(listenableFuture, new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
completableFuture.complete(result);
}
@Override
public void onFailure(Throwable t) {
completableFuture.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());
return completableFuture;
}
private void exponentialBackoff(int attempt) {
try {
Thread.sleep(Math.min(1000 * (1 << attempt), 30000)); // Max 30 seconds
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new JaegerStorageException("Write operation interrupted", e);
}
}
private void initializeStubs() {
ManagedChannel channel = channelFactory.createChannel();
this.blockingStub = SpanWriterPluginGrpc.newBlockingStub(channel);
this.futureStub = SpanWriterPluginGrpc.newFutureStub(channel);
}
}
Span Reader Client
@Service
@Slf4j
public class JaegerSpanReader {
private final JaegerStorageConfig config;
private final JaegerGrpcChannelFactory channelFactory;
private SpanReaderPluginGrpc.SpanReaderPluginBlockingStub blockingStub;
public JaegerSpanReader(JaegerStorageConfig config, JaegerGrpcChannelFactory channelFactory) {
this.config = config;
this.channelFactory = channelFactory;
initializeStub();
}
@Cacheable(value = "traces", key = "#traceId")
public Trace getTrace(String traceId) {
try {
GetTraceRequest request = GetTraceRequest.newBuilder()
.setTraceId(ByteString.copyFrom(hexToBytes(traceId)))
.build();
GetTraceResponse response = blockingStub
.withDeadlineAfter(config.getTimeoutMs(), TimeUnit.MILLISECONDS)
.getTrace(request);
return convertToTrace(response.getSpansList());
} catch (Exception e) {
log.error("Failed to get trace: {}", traceId, e);
throw new JaegerStorageException("Trace retrieval failed", e);
}
}
public TraceSearchResult findTraces(TraceQuery query) {
try {
FindTracesRequest request = buildFindTracesRequest(query);
Iterator<SpansResponseChunk> responseStream = blockingStub
.withDeadlineAfter(config.getTimeoutMs(), TimeUnit.MILLISECONDS)
.findTraces(request);
List<Trace> traces = new ArrayList<>();
while (responseStream.hasNext()) {
SpansResponseChunk chunk = responseStream.next();
traces.add(convertToTrace(chunk.getSpansList()));
if (traces.size() >= query.getLimit()) {
break;
}
}
return TraceSearchResult.builder()
.traces(traces)
.total(traces.size())
.limit(query.getLimit())
.offset(0)
.build();
} catch (Exception e) {
log.error("Failed to find traces for query: {}", query, e);
throw new JaegerStorageException("Trace search failed", e);
}
}
public List<String> getServices() {
try {
GetServicesRequest request = GetServicesRequest.newBuilder().build();
GetServicesResponse response = blockingStub
.withDeadlineAfter(config.getTimeoutMs(), TimeUnit.MILLISECONDS)
.getServices(request);
return response.getServicesList();
} catch (Exception e) {
log.error("Failed to get services list", e);
throw new JaegerStorageException("Services retrieval failed", e);
}
}
public List<String> getOperations(String serviceName) {
try {
GetOperationsRequest request = GetOperationsRequest.newBuilder()
.setService(serviceName)
.build();
GetOperationsResponse response = blockingStub
.withDeadlineAfter(config.getTimeoutMs(), TimeUnit.MILLISECONDS)
.getOperations(request);
return response.getOperationsList().stream()
.map(Operation::getName)
.collect(Collectors.toList());
} catch (Exception e) {
log.error("Failed to get operations for service: {}", serviceName, e);
throw new JaegerStorageException("Operations retrieval failed", e);
}
}
public List<DependencyLink> getDependencies(Instant startTime, Instant endTime) {
try {
GetDependenciesRequest request = GetDependenciesRequest.newBuilder()
.setStartTime(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(startTime.getEpochSecond())
.setNanos(startTime.getNano())
.build())
.setEndTime(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(endTime.getEpochSecond())
.setNanos(endTime.getNano())
.build())
.build();
GetDependenciesResponse response = blockingStub
.withDeadlineAfter(config.getTimeoutMs(), TimeUnit.MILLISECONDS)
.getDependencies(request);
return response.getDependenciesList().stream()
.map(this::convertToDependencyLink)
.collect(Collectors.toList());
} catch (Exception e) {
log.error("Failed to get dependencies", e);
throw new JaegerStorageException("Dependencies retrieval failed", e);
}
}
private FindTracesRequest buildFindTracesRequest(TraceQuery query) {
TraceQueryParameters.Builder queryBuilder = TraceQueryParameters.newBuilder();
if (query.getServiceName() != null) {
queryBuilder.setServiceName(query.getServiceName());
}
if (query.getOperationName() != null) {
queryBuilder.setOperationName(query.getOperationName());
}
if (query.hasTimeRange()) {
queryBuilder.setStartTimeMin(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(query.getStartTimeMin().getEpochSecond())
.setNanos(query.getStartTimeMin().getNano())
.build());
queryBuilder.setStartTimeMax(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(query.getStartTimeMax().getEpochSecond())
.setNanos(query.getStartTimeMax().getNano())
.build());
} else {
// Default to last 24 hours
Instant end = Instant.now();
Instant start = end.minus(config.getTraceLookback());
queryBuilder.setStartTimeMin(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(start.getEpochSecond())
.setNanos(start.getNano())
.build());
queryBuilder.setStartTimeMax(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(end.getEpochSecond())
.setNanos(end.getNano())
.build());
}
if (query.hasDurationFilter()) {
if (query.getDurationMin() != null) {
queryBuilder.setDurationMin(com.google.protobuf.Duration.newBuilder()
.setSeconds(query.getDurationMin().getSeconds())
.setNanos(query.getDurationMin().getNano())
.build());
}
if (query.getDurationMax() != null) {
queryBuilder.setDurationMax(com.google.protobuf.Duration.newBuilder()
.setSeconds(query.getDurationMax().getSeconds())
.setNanos(query.getDurationMax().getNano())
.build());
}
}
// Add tags
query.getTags().forEach((key, value) ->
queryBuilder.addTags(KeyValue.newBuilder()
.setKey(key)
.setVStr(value)
.setVType(ValueType.STRING)
.build())
);
queryBuilder.setSearchDepth(query.getSearchDepth());
return FindTracesRequest.newBuilder()
.setQuery(queryBuilder.build())
.build();
}
private Trace convertToTrace(List<io.jaegertracing.api_v2.Span> protoSpans) {
if (protoSpans.isEmpty()) {
return null;
}
List<Span> spans = protoSpans.stream()
.map(this::convertFromProtoSpan)
.collect(Collectors.toList());
// Calculate trace duration
long minStartTime = spans.stream()
.mapToLong(Span::getStartTime)
.min()
.orElse(0);
long maxEndTime = spans.stream()
.mapToLong(span -> span.getStartTime() + span.getDuration())
.max()
.orElse(0);
Duration duration = Duration.ofMicros(maxEndTime - minStartTime);
// Extract unique services
Map<String, Object> services = spans.stream()
.map(Span::getProcess)
.filter(Objects::nonNull)
.map(SpanProcess::getServiceName)
.distinct()
.collect(Collectors.toMap(
Function.identity(),
service -> Collections.emptyMap()
));
return Trace.builder()
.traceId(bytesToHex(protoSpans.get(0).getTraceId().toByteArray()))
.spans(spans)
.startTime(Instant.ofEpochMilli(minStartTime / 1000))
.duration(duration)
.services(services)
.build();
}
private Span convertFromProtoSpan(io.jaegertracing.api_v2.Span protoSpan) {
Span.SpanBuilder builder = Span.builder()
.traceId(bytesToHex(protoSpan.getTraceId().toByteArray()))
.spanId(bytesToHex(protoSpan.getSpanId().toByteArray()))
.operationName(protoSpan.getOperationName())
.startTime(protoSpan.getStartTime().getSeconds() * 1_000_000 + protoSpan.getStartTime().getNanos() / 1000)
.duration(protoSpan.getDuration().getSeconds() * 1_000_000 + protoSpan.getDuration().getNanos() / 1000);
// Parent span ID
if (protoSpan.hasParentSpanId()) {
builder.parentSpanId(bytesToHex(protoSpan.getParentSpanId().toByteArray()));
}
// Tags
if (protoSpan.getTagsCount() > 0) {
Map<String, String> tags = new HashMap<>();
protoSpan.getTagsList().forEach(tag -> {
if (tag.getVType() == ValueType.STRING) {
tags.put(tag.getKey(), tag.getVStr());
}
});
builder.tags(tags);
}
// Process
if (protoSpan.hasProcess()) {
Process process = protoSpan.getProcess();
SpanProcess spanProcess = SpanProcess.builder()
.serviceName(process.getServiceName())
.build();
if (process.getTagsCount() > 0) {
Map<String, String> processTags = new HashMap<>();
process.getTagsList().forEach(tag -> {
if (tag.getVType() == ValueType.STRING) {
processTags.put(tag.getKey(), tag.getVStr());
}
});
spanProcess.setTags(processTags);
}
builder.process(spanProcess);
}
return builder.build();
}
private DependencyLink convertToDependencyLink(DependencyLink protoLink) {
return DependencyLink.builder()
.parent(protoLink.getParent())
.child(protoLink.getChild())
.callCount(protoLink.getCallCount())
.source("jaeger")
.build();
}
private String bytesToHex(byte[] bytes) {
StringBuilder hex = new StringBuilder();
for (byte b : bytes) {
hex.append(String.format("%02x", b));
}
return hex.toString();
}
private byte[] hexToBytes(String hex) {
// Same implementation as in JaegerSpanWriter
int len = hex.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4)
+ Character.digit(hex.charAt(i + 1), 16));
}
return data;
}
private void initializeStub() {
ManagedChannel channel = channelFactory.createChannel();
this.blockingStub = SpanReaderPluginGrpc.newBlockingStub(channel);
}
}

Step 5: Trace Analysis Service

@Service
@Slf4j
public class TraceAnalysisService {
private final JaegerSpanReader spanReader;
public TraceAnalysisService(JaegerSpanReader spanReader) {
this.spanReader = spanReader;
}
public Map<String, Object> analyzeTrace(String traceId) {
Trace trace = spanReader.getTrace(traceId);
if (trace == null) {
throw new JaegerStorageException("Trace not found: " + traceId);
}
Map<String, Object> analysis = new HashMap<>();
analysis.put("traceId", traceId);
analysis.put("totalSpans", trace.getSpans().size());
analysis.put("durationMs", trace.getDuration().toMillis());
analysis.put("startTime", trace.getStartTime());
analysis.put("services", trace.getServices().keySet());
// Service analysis
Map<String, Object> serviceAnalysis = analyzeServices(trace);
analysis.put("serviceAnalysis", serviceAnalysis);
// Error analysis
Map<String, Object> errorAnalysis = analyzeErrors(trace);
analysis.put("errorAnalysis", errorAnalysis);
// Performance analysis
Map<String, Object> performanceAnalysis = analyzePerformance(trace);
analysis.put("performanceAnalysis", performanceAnalysis);
return analysis;
}
public List<ServiceOperation> getTopOperations(String serviceName, Instant start, Instant end, int limit) {
TraceQuery query = TraceQuery.builder()
.serviceName(serviceName)
.startTimeMin(start)
.startTimeMax(end)
.limit(1000) // Get enough traces for analysis
.build();
TraceSearchResult result = spanReader.findTraces(query);
// Aggregate operations data
Map<String, ServiceOperationStats> operationStats = new HashMap<>();
for (Trace trace : result.getTraces()) {
List<Span> serviceSpans = trace.getSpansByService(serviceName);
for (Span span : serviceSpans) {
String operation = span.getOperationName();
ServiceOperationStats stats = operationStats.computeIfAbsent(operation, 
k -> new ServiceOperationStats());
stats.spanCount++;
stats.totalDuration += span.getDuration();
if (span.hasError()) {
stats.errorCount++;
}
}
}
return operationStats.entrySet().stream()
.map(entry -> {
ServiceOperationStats stats = entry.getValue();
return ServiceOperation.builder()
.service(serviceName)
.operation(entry.getKey())
.spanCount(stats.spanCount)
.avgDuration(stats.spanCount > 0 ? stats.totalDuration / stats.spanCount / 1000.0 : 0)
.errorRate(stats.spanCount > 0 ? (double) stats.errorCount / stats.spanCount : 0)
.build();
})
.sorted((a, b) -> Long.compare(b.getSpanCount(), a.getSpanCount()))
.limit(limit)
.collect(Collectors.toList());
}
public Map<String, List<DependencyLink>> getServiceDependencies(Instant start, Instant end) {
List<DependencyLink> dependencies = spanReader.getDependencies(start, end);
return dependencies.stream()
.collect(Collectors.groupingBy(DependencyLink::getParent));
}
public List<String> findSlowTraces(String serviceName, Duration threshold, Instant start, Instant end) {
TraceQuery query = TraceQuery.builder()
.serviceName(serviceName)
.startTimeMin(start)
.startTimeMax(end)
.durationMin(threshold)
.limit(100)
.build();
TraceSearchResult result = spanReader.findTraces(query);
return result.getTraces().stream()
.map(Trace::getTraceId)
.collect(Collectors.toList());
}
public List<String> findErrorTraces(String serviceName, Instant start, Instant end) {
TraceQuery query = TraceQuery.builder()
.serviceName(serviceName)
.startTimeMin(start)
.startTimeMax(end)
.tags(Collections.singletonMap("error", "true"))
.limit(100)
.build();
TraceSearchResult result = spanReader.findTraces(query);
return result.getTraces().stream()
.map(Trace::getTraceId)
.collect(Collectors.toList());
}
private Map<String, Object> analyzeServices(Trace trace) {
Map<String, Object> serviceAnalysis = new HashMap<>();
Map<String, ServiceStats> stats = new HashMap<>();
for (Span span : trace.getSpans()) {
String serviceName = span.getProcess().getServiceName();
ServiceStats serviceStats = stats.computeIfAbsent(serviceName, k -> new ServiceStats());
serviceStats.spanCount++;
serviceStats.totalDuration += span.getDuration();
if (span.hasError()) {
serviceStats.errorCount++;
}
}
stats.forEach((serviceName, serviceStats) -> {
Map<String, Object> serviceData = new HashMap<>();
serviceData.put("spanCount", serviceStats.spanCount);
serviceData.put("avgDurationMs", serviceStats.spanCount > 0 ? 
serviceStats.totalDuration / serviceStats.spanCount / 1000.0 : 0);
serviceData.put("errorCount", serviceStats.errorCount);
serviceData.put("errorRate", serviceStats.spanCount > 0 ? 
(double) serviceStats.errorCount / serviceStats.spanCount : 0);
serviceAnalysis.put(serviceName, serviceData);
});
return serviceAnalysis;
}
private Map<String, Object> analyzeErrors(Trace trace) {
Map<String, Object> errorAnalysis = new HashMap<>();
List<Map<String, Object>> errors = new ArrayList<>();
for (Span span : trace.getSpans()) {
if (span.hasError()) {
Map<String, Object> error = new HashMap<>();
error.put("service", span.getProcess().getServiceName());
error.put("operation", span.getOperationName());
error.put("spanId", span.getSpanId());
error.put("durationMs", span.getDuration() / 1000.0);
// Extract error details from tags
span.getTag("error.message").ifPresent(msg -> error.put("message", msg));
span.getTag("error.type").ifPresent(type -> error.put("type", type));
errors.add(error);
}
}
errorAnalysis.put("totalErrors", errors.size());
errorAnalysis.put("errors", errors);
errorAnalysis.put("servicesWithErrors", errors.stream()
.map(error -> (String) error.get("service"))
.distinct()
.collect(Collectors.toList()));
return errorAnalysis;
}
private Map<String, Object> analyzePerformance(Trace trace) {
Map<String, Object> performance = new HashMap<>();
if (trace.getSpans().isEmpty()) {
return performance;
}
// Find critical path
List<Span> criticalPath = findCriticalPath(trace);
performance.put("criticalPathLength", criticalPath.size());
performance.put("criticalPathDurationMs", criticalPath.stream()
.mapToLong(Span::getDuration)
.sum() / 1000.0);
// Bottleneck analysis
Optional<Span> bottleneck = trace.getSpans().stream()
.max(Comparator.comparingLong(Span::getDuration));
bottleneck.ifPresent(span -> {
Map<String, Object> bottleneckInfo = new HashMap<>();
bottleneckInfo.put("service", span.getProcess().getServiceName());
bottleneckInfo.put("operation", span.getOperationName());
bottleneckInfo.put("durationMs", span.getDuration() / 1000.0);
bottleneckInfo.put("percentage", (double) span.getDuration() / trace.getDuration().toMicros());
performance.put("bottleneck", bottleneckInfo);
});
return performance;
}
private List<Span> findCriticalPath(Trace trace) {
// Simplified critical path algorithm
// In production, you'd want a more sophisticated implementation
return trace.getSpans().stream()
.filter(span -> span.getDuration() > trace.getDuration().toMicros() * 0.1) // Spans > 10% of total
.sorted(Comparator.comparingLong(Span::getDuration).reversed())
.limit(5)
.collect(Collectors.toList());
}
// Helper classes for internal statistics
private static class ServiceStats {
int spanCount = 0;
long totalDuration = 0;
int errorCount = 0;
}
private static class ServiceOperationStats {
int spanCount = 0;
long totalDuration = 0;
int errorCount = 0;
}
}

Step 6: REST API Controllers

@RestController
@RequestMapping("/api/jaeger")
@Slf4j
public class JaegerStorageController {
@Autowired
private JaegerSpanReader spanReader;
@Autowired
private JaegerSpanWriter spanWriter;
@Autowired
private TraceAnalysisService analysisService;
@GetMapping("/traces/{traceId}")
public ResponseEntity<?> getTrace(@PathVariable String traceId) {
try {
Trace trace = spanReader.getTrace(traceId);
if (trace == null) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(trace);
} catch (Exception e) {
log.error("Failed to get trace: {}", traceId, e);
return ResponseEntity.badRequest().body(new ErrorResponse("TRACE_NOT_FOUND", e.getMessage()));
}
}
@PostMapping("/traces/search")
public ResponseEntity<?> searchTraces(@Valid @RequestBody TraceQuery query) {
try {
TraceSearchResult result = spanReader.findTraces(query);
return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("Failed to search traces", e);
return ResponseEntity.badRequest().body(new ErrorResponse("TRACE_SEARCH_ERROR", e.getMessage()));
}
}
@GetMapping("/traces/{traceId}/analysis")
public ResponseEntity<?> analyzeTrace(@PathVariable String traceId) {
try {
Map<String, Object> analysis = analysisService.analyzeTrace(traceId);
return ResponseEntity.ok(analysis);
} catch (Exception e) {
log.error("Failed to analyze trace: {}", traceId, e);
return ResponseEntity.badRequest().body(new ErrorResponse("TRACE_ANALYSIS_ERROR", e.getMessage()));
}
}
@GetMapping("/services")
public ResponseEntity<?> getServices() {
try {
List<String> services = spanReader.getServices();
return ResponseEntity.ok(services);
} catch (Exception e) {
log.error("Failed to get services", e);
return ResponseEntity.badRequest().body(new ErrorResponse("SERVICES_ERROR", e.getMessage()));
}
}
@GetMapping("/services/{serviceName}/operations")
public ResponseEntity<?> getServiceOperations(@PathVariable String serviceName) {
try {
List<String> operations = spanReader.getOperations(serviceName);
return ResponseEntity.ok(operations);
} catch (Exception e) {
log.error("Failed to get operations for service: {}", serviceName, e);
return ResponseEntity.badRequest().body(new ErrorResponse("OPERATIONS_ERROR", e.getMessage()));
}
}
@GetMapping("/services/{serviceName}/top-operations")
public ResponseEntity<?> getTopOperations(
@PathVariable String serviceName,
@RequestParam(defaultValue = "10") int limit,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant start,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant end) {
try {
if (start == null) start = Instant.now().minus(Duration.ofHours(1));
if (end == null) end = Instant.now();
List<ServiceOperation> operations = analysisService.getTopOperations(
serviceName, start, end, limit);
return ResponseEntity.ok(operations);
} catch (Exception e) {
log.error("Failed to get top operations for service: {}", serviceName, e);
return ResponseEntity.badRequest().body(new ErrorResponse("TOP_OPERATIONS_ERROR", e.getMessage()));
}
}
@GetMapping("/dependencies")
public ResponseEntity<?> getDependencies(
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant start,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant end) {
try {
if (start == null) start = Instant.now().minus(Duration.ofHours(24));
if (end == null) end = Instant.now();
Map<String, List<DependencyLink>> dependencies = analysisService.getServiceDependencies(start, end);
return ResponseEntity.ok(dependencies);
} catch (Exception e) {
log.error("Failed to get dependencies", e);
return ResponseEntity.badRequest().body(new ErrorResponse("DEPENDENCIES_ERROR", e.getMessage()));
}
}
@PostMapping("/spans")
public ResponseEntity<?> writeSpans(@Valid @RequestBody List<Span> spans) {
try {
WriteSpansResponse response = spanWriter.writeSpans(spans);
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("Failed to write spans", e);
return ResponseEntity.badRequest().body(new ErrorResponse("SPAN_WRITE_ERROR", e.getMessage()));
}
}
}
@Data
@AllArgsConstructor
class ErrorResponse {
private String errorCode;
private String message;
private LocalDateTime timestamp;
public ErrorResponse(String errorCode, String message) {
this.errorCode = errorCode;
this.message = message;
this.timestamp = LocalDateTime.now();
}
}

Step 7: Exception Handling

public class JaegerStorageException extends RuntimeException {
public JaegerStorageException(String message) {
super(message);
}
public JaegerStorageException(String message, Throwable cause) {
super(message, cause);
}
}
@ControllerAdvice
public class JaegerStorageExceptionHandler {
@ExceptionHandler(JaegerStorageException.class)
public ResponseEntity<ErrorResponse> handleJaegerStorageException(JaegerStorageException ex) {
ErrorResponse error = new ErrorResponse("JAEGER_STORAGE_ERROR", ex.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error);
}
@ExceptionHandler(StatusRuntimeException.class)
public ResponseEntity<ErrorResponse> handleGrpcException(StatusRuntimeException ex) {
ErrorResponse error = new ErrorResponse("GRPC_ERROR", ex.getStatus().getDescription());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error);
}
}

Step 8: Configuration File

application.yml:

jaeger:
storage:
grpc-host: "localhost"
grpc-port: 17271
grpc-tls-enabled: false
timeout-ms: 30000
max-retries: 3
max-concurrent-calls: 100
keep-alive: true
keep-alive-time-ms: 30000
max-traces-per-query: 100
trace-lookback: 24h
enable-trace-cache: true
trace-cache-ttl: 10m
backend: "GRPC_PLUGIN"
enable-metrics: true
metrics-collection-interval: 30s
spring:
cache:
type: caffeine
logging:
level:
com.yourcompany.jaeger: DEBUG

Step 9: Usage Examples

@Service
@Slf4j
public class JaegerExampleService {
@Autowired
private JaegerSpanWriter spanWriter;
@Autowired
private TraceAnalysisService analysisService;
public void demonstrateUsage() {
// Write spans to Jaeger
Span span = createExampleSpan();
spanWriter.writeSpans(Collections.singletonList(span));
// Analyze recent traces for a service
Instant end = Instant.now();
Instant start = end.minus(Duration.ofHours(1));
List<ServiceOperation> topOperations = analysisService.getTopOperations(
"order-service", start, end, 10);
topOperations.forEach(op -> 
log.info("Operation: {}, Count: {}, Avg Duration: {:.2f}ms", 
op.getOperation(), op.getSpanCount(), op.getAvgDuration())
);
// Find slow traces
List<String> slowTraces = analysisService.findSlowTraces(
"order-service", Duration.ofMillis(1000), start, end);
log.info("Found {} slow traces", slowTraces.size());
}
private Span createExampleSpan() {
return Span.builder()
.traceId("1a2b3c4d5e6f7g8h")
.spanId("9i0j1k2l3m4n5o6p")
.operationName("processOrder")
.startTime(System.currentTimeMillis() * 1000)
.duration(150000) // 150ms in microseconds
.tags(Map.of(
"http.method", "POST",
"http.status_code", "200",
"component", "java-spring"
))
.process(SpanProcess.builder()
.serviceName("order-service")
.tags(Map.of(
"version", "1.0.0",
"environment", "production"
))
.build())
.build();
}
}

Key Features Implemented

  1. gRPC Storage Integration: Direct communication with Jaeger storage plugin
  2. Trace Writing: Efficient span writing with retry mechanisms
  3. Trace Querying: Flexible trace search and retrieval
  4. Trace Analysis: Comprehensive trace analysis and performance insights
  5. Dependency Analysis: Service dependency mapping
  6. Error Tracking: Error detection and analysis across traces
  7. Performance Monitoring: Bottleneck identification and critical path analysis

Best Practices

  1. Connection Management: Proper gRPC channel lifecycle management
  2. Error Handling: Comprehensive error handling with retry mechanisms
  3. Caching: Strategic caching of frequently accessed traces
  4. Monitoring: Metrics collection for performance monitoring
  5. Resource Cleanup: Proper shutdown of gRPC channels and connections
  6. Batch Operations: Efficient batch writing of spans
  7. Async Operations: Non-blocking operations for better performance

This comprehensive Jaeger gRPC storage integration provides a robust foundation for building distributed tracing solutions in Java, enabling efficient trace storage, querying, and advanced analysis of microservices performance.

Java Observability, Logging Intelligence & AI-Driven Monitoring (APM, Tracing, Logs & Anomaly Detection)

https://macronepal.com/blog/beyond-metrics-observing-serverless-and-traditional-java-applications-with-thundra-apm/
Explains using Thundra APM to observe both serverless and traditional Java applications by combining tracing, metrics, and logs into a unified observability platform for faster debugging and performance insights.

https://macronepal.com/blog/dynatrace-oneagent-in-java-2/
Explains Dynatrace OneAgent for Java, which automatically instruments JVM applications to capture metrics, traces, and logs, enabling full-stack monitoring and root-cause analysis with minimal configuration.

https://macronepal.com/blog/lightstep-java-sdk-distributed-tracing-and-observability-implementation/
Explains Lightstep Java SDK for distributed tracing, helping developers track requests across microservices and identify latency issues using OpenTelemetry-based observability.

https://macronepal.com/blog/honeycomb-io-beeline-for-java-complete-guide-2/
Explains Honeycomb Beeline for Java, which provides high-cardinality observability and deep query capabilities to understand complex system behavior and debug distributed systems efficiently.

https://macronepal.com/blog/lumigo-for-serverless-in-java-complete-distributed-tracing-guide-2/
Explains Lumigo for Java serverless applications, offering automatic distributed tracing, log correlation, and error tracking to simplify debugging in cloud-native environments. (Lumigo Docs)

https://macronepal.com/blog/from-noise-to-signals-implementing-log-anomaly-detection-in-java-applications/
Explains how to detect anomalies in Java logs using behavioral patterns and machine learning techniques to separate meaningful incidents from noisy log data and improve incident response.

https://macronepal.com/blog/ai-powered-log-analysis-in-java-from-reactive-debugging-to-proactive-insights/
Explains AI-driven log analysis for Java applications, shifting from manual debugging to predictive insights that identify issues early and improve system reliability using intelligent log processing.

https://macronepal.com/blog/titliel-java-logging-best-practices/
Explains best practices for Java logging, focusing on structured logs, proper log levels, performance optimization, and ensuring logs are useful for debugging and observability systems.

https://macronepal.com/blog/seeking-a-loguru-for-java-the-quest-for-elegant-and-simple-logging/
Explains the search for simpler, more elegant logging frameworks in Java, comparing modern logging approaches that aim to reduce complexity while improving readability and developer experience.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper