Jaeger gRPC Storage Integration in Java

Jaeger is a distributed tracing system that helps monitor and troubleshoot microservices-based distributed systems. This guide demonstrates how to integrate Jaeger's gRPC storage plugin with Java applications for storing and querying trace data.

Why Use Jaeger gRPC Storage?

  • Unified Tracing: Collect traces across multiple services
  • gRPC Performance: High-performance binary protocol for storage operations
  • Flexible Backends: Support for various storage backends (Cassandra, Elasticsearch, Kafka)
  • Rich Querying: Complex trace querying and analysis capabilities
  • Microservices Ready: Designed for distributed systems

Prerequisites

  • Jaeger with gRPC storage plugin enabled
  • Java 11+ with gRPC capabilities
  • Maven/Gradle for dependency management

Step 1: Project Dependencies

Maven (pom.xml):

<dependencies>
<!-- gRPC Dependencies -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>1.53.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.53.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.53.0</version>
</dependency>
<!-- Jaeger Proto Dependencies -->
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-proto</artifactId>
<version>1.8.1</version>
</dependency>
<!-- Protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.22.2</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.22.2</version>
</dependency>
<!-- HTTP Client for REST fallback -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Spring Boot (Optional) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.0</version>
</dependency>
<!-- Cache -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
<version>2.7.0</version>
</dependency>
<!-- Micrometer for metrics -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>1.10.5</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.22.2:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.53.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Step 2: Configuration Classes

@Configuration
@ConfigurationProperties(prefix = "jaeger.storage")
@Data
public class JaegerStorageConfig {
// gRPC Storage Plugin Configuration
private String grpcHost = "localhost";
private int grpcPort = 17271;
private boolean grpcTlsEnabled = false;
private String grpcTlsCertPath;
// Connection Settings
private long timeoutMs = 30000;
private int maxRetries = 3;
private int maxConcurrentCalls = 100;
private boolean keepAlive = true;
private long keepAliveTimeMs = 30000;
// Query Settings
private int maxTracesPerQuery = 100;
private Duration traceLookback = Duration.ofHours(24);
private boolean enableTraceCache = true;
private Duration traceCacheTtl = Duration.ofMinutes(10);
// Storage Backend Settings
private StorageBackend backend = StorageBackend.MEMORY;
private String cassandraContactPoints;
private String elasticsearchUrl;
private String kafkaBootstrapServers;
// Monitoring
private boolean enableMetrics = true;
private Duration metricsCollectionInterval = Duration.ofSeconds(30);
public enum StorageBackend {
MEMORY, CASSANDRA, ELASTICSEARCH, KAFKA, GRPC_PLUGIN
}
public String getGrpcEndpoint() {
return grpcHost + ":" + grpcPort;
}
}
@Component
@Slf4j
public class JaegerGrpcChannelFactory {
private final JaegerStorageConfig config;
private final Map<String, ManagedChannel> channels = new ConcurrentHashMap<>();
public JaegerGrpcChannelFactory(JaegerStorageConfig config) {
this.config = config;
}
public ManagedChannel createChannel() {
return createChannel(config.getGrpcEndpoint());
}
public ManagedChannel createChannel(String endpoint) {
return channels.computeIfAbsent(endpoint, this::buildChannel);
}
private ManagedChannel buildChannel(String endpoint) {
NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(endpoint);
if (config.isGrpcTlsEnabled()) {
try {
channelBuilder.useTransportSecurity();
if (config.getGrpcTlsCertPath() != null) {
// Load custom TLS certificate
channelBuilder.sslContext(buildSslContext());
}
} catch (Exception e) {
log.warn("Failed to configure TLS, falling back to plaintext", e);
channelBuilder.usePlaintext();
}
} else {
channelBuilder.usePlaintext();
}
// Configure connection settings
channelBuilder
.keepAliveWithoutCalls(config.isKeepAlive())
.keepAliveTime(config.getKeepAliveTimeMs(), TimeUnit.MILLISECONDS)
.maxRetryAttempts(config.getMaxRetries())
.enableRetry();
ManagedChannel channel = channelBuilder.build();
log.info("Created gRPC channel to Jaeger storage: {}", endpoint);
return channel;
}
private SslContext buildSslContext() throws Exception {
// Implementation for building SSL context with custom certificates
return GrpcSslContexts.forClient().build();
}
@PreDestroy
public void shutdown() {
channels.values().forEach(ManagedChannel::shutdown);
log.info("Shutdown all Jaeger gRPC channels");
}
}

Step 3: Core Data Models

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TraceQuery {
private String serviceName;
private String operationName;
private Map<String, String> tags;
private Instant startTimeMin;
private Instant startTimeMax;
private Duration durationMin;
private Duration durationMax;
private int searchDepth = 20;
private int limit = 100;
public boolean hasTimeRange() {
return startTimeMin != null && startTimeMax != null;
}
public boolean hasDurationFilter() {
return durationMin != null || durationMax != null;
}
public Map<String, String> getTags() {
return tags != null ? tags : Collections.emptyMap();
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Trace {
private String traceId;
private List<Span> spans;
private Map<String, String> processes;
private Instant startTime;
private Duration duration;
private Map<String, Object> services;
private Map<String, Object> operations;
public Optional<Span> getRootSpan() {
return spans.stream()
.filter(span -> span.getParentSpanId() == null || span.getParentSpanId().isEmpty())
.findFirst();
}
public List<Span> getSpansByService(String serviceName) {
return spans.stream()
.filter(span -> serviceName.equals(span.getProcess().getServiceName()))
.collect(Collectors.toList());
}
public Map<String, Long> getServiceDurations() {
return spans.stream()
.collect(Collectors.groupingBy(
span -> span.getProcess().getServiceName(),
Collectors.summingLong(Span::getDuration)
));
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Span {
private String traceId;
private String spanId;
private String parentSpanId;
private String operationName;
private SpanReference[] references;
private long startTime; // microseconds
private long duration; // microseconds
private Map<String, String> tags;
private SpanProcess process;
private SpanLog[] logs;
public Instant getStartTimeInstant() {
return Instant.ofEpochMilli(startTime / 1000);
}
public Duration getDurationAsDuration() {
return Duration.ofMicros(duration);
}
public boolean hasError() {
return tags != null && tags.entrySet().stream()
.anyMatch(entry -> "error".equals(entry.getKey()) && "true".equals(entry.getValue()));
}
public Optional<String> getTag(String key) {
return tags != null ? Optional.ofNullable(tags.get(key)) : Optional.empty();
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SpanProcess {
private String serviceName;
private Map<String, String> tags;
public Optional<String> getTag(String key) {
return tags != null ? Optional.ofNullable(tags.get(key)) : Optional.empty();
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SpanLog {
private long timestamp; // microseconds
private Map<String, String> fields;
public Instant getTimestampInstant() {
return Instant.ofEpochMilli(timestamp / 1000);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SpanReference {
private String traceId;
private String spanId;
private ReferenceType refType;
public enum ReferenceType {
CHILD_OF, FOLLOWS_FROM
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TraceSearchResult {
private List<Trace> traces;
private int total;
private int limit;
private int offset;
private Map<String, Object> errors;
public static TraceSearchResult empty() {
return TraceSearchResult.builder()
.traces(Collections.emptyList())
.total(0)
.limit(0)
.offset(0)
.build();
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DependencyLink {
private String parent;
private String child;
private long callCount;
private String source;
private Map<String, Object> stats;
public String getKey() {
return parent + "->" + child;
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ServiceOperation {
private String service;
private String operation;
private long spanCount;
private double avgDuration;
private double errorRate;
}

Step 4: gRPC Service Clients

Span Writer Client
@Service
@Slf4j
public class JaegerSpanWriter {
private final JaegerStorageConfig config;
private final JaegerGrpcChannelFactory channelFactory;
private SpanWriterPluginGrpc.SpanWriterPluginBlockingStub blockingStub;
private SpanWriterPluginGrpc.SpanWriterPluginFutureStub futureStub;
public JaegerSpanWriter(JaegerStorageConfig config, JaegerGrpcChannelFactory channelFactory) {
this.config = config;
this.channelFactory = channelFactory;
initializeStubs();
}
public WriteSpansResponse writeSpans(List<Span> spans) {
if (spans == null || spans.isEmpty()) {
return WriteSpansResponse.newBuilder().build();
}
try {
WriteSpansRequest request = buildWriteSpansRequest(spans);
return blockingStub
.withDeadlineAfter(config.getTimeoutMs(), TimeUnit.MILLISECONDS)
.writeSpans(request);
} catch (Exception e) {
log.error("Failed to write {} spans to Jaeger storage", spans.size(), e);
throw new JaegerStorageException("Span write operation failed", e);
}
}
public CompletableFuture<WriteSpansResponse> writeSpansAsync(List<Span> spans) {
if (spans == null || spans.isEmpty()) {
return CompletableFuture.completedFuture(WriteSpansResponse.newBuilder().build());
}
try {
WriteSpansRequest request = buildWriteSpansRequest(spans);
ListenableFuture<WriteSpansResponse> future = futureStub
.withDeadlineAfter(config.getTimeoutMs(), TimeUnit.MILLISECONDS)
.writeSpans(request);
return convertToCompletableFuture(future);
} catch (Exception e) {
log.error("Failed to write {} spans asynchronously", spans.size(), e);
CompletableFuture<WriteSpansResponse> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new JaegerStorageException("Async span write failed", e));
return failedFuture;
}
}
public void writeSpansWithRetry(List<Span> spans, int maxRetries) {
int attempt = 0;
while (attempt < maxRetries) {
try {
writeSpans(spans);
return;
} catch (Exception e) {
attempt++;
if (attempt >= maxRetries) {
throw e;
}
log.warn("Write attempt {} failed, retrying...", attempt);
exponentialBackoff(attempt);
}
}
}
private WriteSpansRequest buildWriteSpansRequest(List<Span> spans) {
Batch batch = Batch.newBuilder()
.addAllSpans(spans.stream()
.map(this::convertToProtoSpan)
.collect(Collectors.toList()))
.setProcess(convertToProtoProcess(spans.get(0).getProcess()))
.build();
return WriteSpansRequest.newBuilder()
.setBatch(batch)
.build();
}
private io.jaegertracing.api_v2.Span convertToProtoSpan(Span span) {
io.jaegertracing.api_v2.Span.Builder builder = io.jaegertracing.api_v2.Span.newBuilder()
.setTraceId(ByteString.copyFrom(hexToBytes(span.getTraceId())))
.setSpanId(ByteString.copyFrom(hexToBytes(span.getSpanId())))
.setOperationName(span.getOperationName())
.setStartTime(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(span.getStartTime() / 1_000_000)
.setNanos((int) ((span.getStartTime() % 1_000_000) * 1_000))
.build())
.setDuration(com.google.protobuf.Duration.newBuilder()
.setSeconds(span.getDuration() / 1_000_000)
.setNanos((int) ((span.getDuration() % 1_000_000) * 1_000))
.build());
// Add parent span ID if present
if (span.getParentSpanId() != null && !span.getParentSpanId().isEmpty()) {
builder.setParentSpanId(ByteString.copyFrom(hexToBytes(span.getParentSpanId())));
}
// Add tags
if (span.getTags() != null) {
span.getTags().forEach((key, value) -> 
builder.addTags(KeyValue.newBuilder()
.setKey(key)
.setVStr(value)
.setVType(ValueType.STRING)
.build())
);
}
// Add process
if (span.getProcess() != null) {
builder.setProcess(convertToProtoProcess(span.getProcess()));
}
return builder.build();
}
private Process convertToProtoProcess(SpanProcess process) {
Process.Builder builder = Process.newBuilder()
.setServiceName(process.getServiceName());
if (process.getTags() != null) {
process.getTags().forEach((key, value) ->
builder.addTags(KeyValue.newBuilder()
.setKey(key)
.setVStr(value)
.setVType(ValueType.STRING)
.build())
);
}
return builder.build();
}
private byte[] hexToBytes(String hex) {
int len = hex.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4)
+ Character.digit(hex.charAt(i + 1), 16));
}
return data;
}
private <T> CompletableFuture<T> convertToCompletableFuture(ListenableFuture<T> listenableFuture) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
Futures.addCallback(listenableFuture, new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
completableFuture.complete(result);
}
@Override
public void onFailure(Throwable t) {
completableFuture.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());
return completableFuture;
}
private void exponentialBackoff(int attempt) {
try {
Thread.sleep(Math.min(1000 * (1 << attempt), 30000)); // Max 30 seconds
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new JaegerStorageException("Write operation interrupted", e);
}
}
private void initializeStubs() {
ManagedChannel channel = channelFactory.createChannel();
this.blockingStub = SpanWriterPluginGrpc.newBlockingStub(channel);
this.futureStub = SpanWriterPluginGrpc.newFutureStub(channel);
}
}
Span Reader Client
@Service
@Slf4j
public class JaegerSpanReader {
private final JaegerStorageConfig config;
private final JaegerGrpcChannelFactory channelFactory;
private SpanReaderPluginGrpc.SpanReaderPluginBlockingStub blockingStub;
public JaegerSpanReader(JaegerStorageConfig config, JaegerGrpcChannelFactory channelFactory) {
this.config = config;
this.channelFactory = channelFactory;
initializeStub();
}
@Cacheable(value = "traces", key = "#traceId")
public Trace getTrace(String traceId) {
try {
GetTraceRequest request = GetTraceRequest.newBuilder()
.setTraceId(ByteString.copyFrom(hexToBytes(traceId)))
.build();
GetTraceResponse response = blockingStub
.withDeadlineAfter(config.getTimeoutMs(), TimeUnit.MILLISECONDS)
.getTrace(request);
return convertToTrace(response.getSpansList());
} catch (Exception e) {
log.error("Failed to get trace: {}", traceId, e);
throw new JaegerStorageException("Trace retrieval failed", e);
}
}
public TraceSearchResult findTraces(TraceQuery query) {
try {
FindTracesRequest request = buildFindTracesRequest(query);
Iterator<SpansResponseChunk> responseStream = blockingStub
.withDeadlineAfter(config.getTimeoutMs(), TimeUnit.MILLISECONDS)
.findTraces(request);
List<Trace> traces = new ArrayList<>();
while (responseStream.hasNext()) {
SpansResponseChunk chunk = responseStream.next();
traces.add(convertToTrace(chunk.getSpansList()));
if (traces.size() >= query.getLimit()) {
break;
}
}
return TraceSearchResult.builder()
.traces(traces)
.total(traces.size())
.limit(query.getLimit())
.offset(0)
.build();
} catch (Exception e) {
log.error("Failed to find traces for query: {}", query, e);
throw new JaegerStorageException("Trace search failed", e);
}
}
public List<String> getServices() {
try {
GetServicesRequest request = GetServicesRequest.newBuilder().build();
GetServicesResponse response = blockingStub
.withDeadlineAfter(config.getTimeoutMs(), TimeUnit.MILLISECONDS)
.getServices(request);
return response.getServicesList();
} catch (Exception e) {
log.error("Failed to get services list", e);
throw new JaegerStorageException("Services retrieval failed", e);
}
}
public List<String> getOperations(String serviceName) {
try {
GetOperationsRequest request = GetOperationsRequest.newBuilder()
.setService(serviceName)
.build();
GetOperationsResponse response = blockingStub
.withDeadlineAfter(config.getTimeoutMs(), TimeUnit.MILLISECONDS)
.getOperations(request);
return response.getOperationsList().stream()
.map(Operation::getName)
.collect(Collectors.toList());
} catch (Exception e) {
log.error("Failed to get operations for service: {}", serviceName, e);
throw new JaegerStorageException("Operations retrieval failed", e);
}
}
public List<DependencyLink> getDependencies(Instant startTime, Instant endTime) {
try {
GetDependenciesRequest request = GetDependenciesRequest.newBuilder()
.setStartTime(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(startTime.getEpochSecond())
.setNanos(startTime.getNano())
.build())
.setEndTime(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(endTime.getEpochSecond())
.setNanos(endTime.getNano())
.build())
.build();
GetDependenciesResponse response = blockingStub
.withDeadlineAfter(config.getTimeoutMs(), TimeUnit.MILLISECONDS)
.getDependencies(request);
return response.getDependenciesList().stream()
.map(this::convertToDependencyLink)
.collect(Collectors.toList());
} catch (Exception e) {
log.error("Failed to get dependencies", e);
throw new JaegerStorageException("Dependencies retrieval failed", e);
}
}
private FindTracesRequest buildFindTracesRequest(TraceQuery query) {
TraceQueryParameters.Builder queryBuilder = TraceQueryParameters.newBuilder();
if (query.getServiceName() != null) {
queryBuilder.setServiceName(query.getServiceName());
}
if (query.getOperationName() != null) {
queryBuilder.setOperationName(query.getOperationName());
}
if (query.hasTimeRange()) {
queryBuilder.setStartTimeMin(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(query.getStartTimeMin().getEpochSecond())
.setNanos(query.getStartTimeMin().getNano())
.build());
queryBuilder.setStartTimeMax(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(query.getStartTimeMax().getEpochSecond())
.setNanos(query.getStartTimeMax().getNano())
.build());
} else {
// Default to last 24 hours
Instant end = Instant.now();
Instant start = end.minus(config.getTraceLookback());
queryBuilder.setStartTimeMin(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(start.getEpochSecond())
.setNanos(start.getNano())
.build());
queryBuilder.setStartTimeMax(com.google.protobuf.Timestamp.newBuilder()
.setSeconds(end.getEpochSecond())
.setNanos(end.getNano())
.build());
}
if (query.hasDurationFilter()) {
if (query.getDurationMin() != null) {
queryBuilder.setDurationMin(com.google.protobuf.Duration.newBuilder()
.setSeconds(query.getDurationMin().getSeconds())
.setNanos(query.getDurationMin().getNano())
.build());
}
if (query.getDurationMax() != null) {
queryBuilder.setDurationMax(com.google.protobuf.Duration.newBuilder()
.setSeconds(query.getDurationMax().getSeconds())
.setNanos(query.getDurationMax().getNano())
.build());
}
}
// Add tags
query.getTags().forEach((key, value) ->
queryBuilder.addTags(KeyValue.newBuilder()
.setKey(key)
.setVStr(value)
.setVType(ValueType.STRING)
.build())
);
queryBuilder.setSearchDepth(query.getSearchDepth());
return FindTracesRequest.newBuilder()
.setQuery(queryBuilder.build())
.build();
}
private Trace convertToTrace(List<io.jaegertracing.api_v2.Span> protoSpans) {
if (protoSpans.isEmpty()) {
return null;
}
List<Span> spans = protoSpans.stream()
.map(this::convertFromProtoSpan)
.collect(Collectors.toList());
// Calculate trace duration
long minStartTime = spans.stream()
.mapToLong(Span::getStartTime)
.min()
.orElse(0);
long maxEndTime = spans.stream()
.mapToLong(span -> span.getStartTime() + span.getDuration())
.max()
.orElse(0);
Duration duration = Duration.ofMicros(maxEndTime - minStartTime);
// Extract unique services
Map<String, Object> services = spans.stream()
.map(Span::getProcess)
.filter(Objects::nonNull)
.map(SpanProcess::getServiceName)
.distinct()
.collect(Collectors.toMap(
Function.identity(),
service -> Collections.emptyMap()
));
return Trace.builder()
.traceId(bytesToHex(protoSpans.get(0).getTraceId().toByteArray()))
.spans(spans)
.startTime(Instant.ofEpochMilli(minStartTime / 1000))
.duration(duration)
.services(services)
.build();
}
private Span convertFromProtoSpan(io.jaegertracing.api_v2.Span protoSpan) {
Span.SpanBuilder builder = Span.builder()
.traceId(bytesToHex(protoSpan.getTraceId().toByteArray()))
.spanId(bytesToHex(protoSpan.getSpanId().toByteArray()))
.operationName(protoSpan.getOperationName())
.startTime(protoSpan.getStartTime().getSeconds() * 1_000_000 + protoSpan.getStartTime().getNanos() / 1000)
.duration(protoSpan.getDuration().getSeconds() * 1_000_000 + protoSpan.getDuration().getNanos() / 1000);
// Parent span ID
if (protoSpan.hasParentSpanId()) {
builder.parentSpanId(bytesToHex(protoSpan.getParentSpanId().toByteArray()));
}
// Tags
if (protoSpan.getTagsCount() > 0) {
Map<String, String> tags = new HashMap<>();
protoSpan.getTagsList().forEach(tag -> {
if (tag.getVType() == ValueType.STRING) {
tags.put(tag.getKey(), tag.getVStr());
}
});
builder.tags(tags);
}
// Process
if (protoSpan.hasProcess()) {
Process process = protoSpan.getProcess();
SpanProcess spanProcess = SpanProcess.builder()
.serviceName(process.getServiceName())
.build();
if (process.getTagsCount() > 0) {
Map<String, String> processTags = new HashMap<>();
process.getTagsList().forEach(tag -> {
if (tag.getVType() == ValueType.STRING) {
processTags.put(tag.getKey(), tag.getVStr());
}
});
spanProcess.setTags(processTags);
}
builder.process(spanProcess);
}
return builder.build();
}
private DependencyLink convertToDependencyLink(DependencyLink protoLink) {
return DependencyLink.builder()
.parent(protoLink.getParent())
.child(protoLink.getChild())
.callCount(protoLink.getCallCount())
.source("jaeger")
.build();
}
private String bytesToHex(byte[] bytes) {
StringBuilder hex = new StringBuilder();
for (byte b : bytes) {
hex.append(String.format("%02x", b));
}
return hex.toString();
}
private byte[] hexToBytes(String hex) {
// Same implementation as in JaegerSpanWriter
int len = hex.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4)
+ Character.digit(hex.charAt(i + 1), 16));
}
return data;
}
private void initializeStub() {
ManagedChannel channel = channelFactory.createChannel();
this.blockingStub = SpanReaderPluginGrpc.newBlockingStub(channel);
}
}

Step 5: Trace Analysis Service

@Service
@Slf4j
public class TraceAnalysisService {
private final JaegerSpanReader spanReader;
public TraceAnalysisService(JaegerSpanReader spanReader) {
this.spanReader = spanReader;
}
public Map<String, Object> analyzeTrace(String traceId) {
Trace trace = spanReader.getTrace(traceId);
if (trace == null) {
throw new JaegerStorageException("Trace not found: " + traceId);
}
Map<String, Object> analysis = new HashMap<>();
analysis.put("traceId", traceId);
analysis.put("totalSpans", trace.getSpans().size());
analysis.put("durationMs", trace.getDuration().toMillis());
analysis.put("startTime", trace.getStartTime());
analysis.put("services", trace.getServices().keySet());
// Service analysis
Map<String, Object> serviceAnalysis = analyzeServices(trace);
analysis.put("serviceAnalysis", serviceAnalysis);
// Error analysis
Map<String, Object> errorAnalysis = analyzeErrors(trace);
analysis.put("errorAnalysis", errorAnalysis);
// Performance analysis
Map<String, Object> performanceAnalysis = analyzePerformance(trace);
analysis.put("performanceAnalysis", performanceAnalysis);
return analysis;
}
public List<ServiceOperation> getTopOperations(String serviceName, Instant start, Instant end, int limit) {
TraceQuery query = TraceQuery.builder()
.serviceName(serviceName)
.startTimeMin(start)
.startTimeMax(end)
.limit(1000) // Get enough traces for analysis
.build();
TraceSearchResult result = spanReader.findTraces(query);
// Aggregate operations data
Map<String, ServiceOperationStats> operationStats = new HashMap<>();
for (Trace trace : result.getTraces()) {
List<Span> serviceSpans = trace.getSpansByService(serviceName);
for (Span span : serviceSpans) {
String operation = span.getOperationName();
ServiceOperationStats stats = operationStats.computeIfAbsent(operation, 
k -> new ServiceOperationStats());
stats.spanCount++;
stats.totalDuration += span.getDuration();
if (span.hasError()) {
stats.errorCount++;
}
}
}
return operationStats.entrySet().stream()
.map(entry -> {
ServiceOperationStats stats = entry.getValue();
return ServiceOperation.builder()
.service(serviceName)
.operation(entry.getKey())
.spanCount(stats.spanCount)
.avgDuration(stats.spanCount > 0 ? stats.totalDuration / stats.spanCount / 1000.0 : 0)
.errorRate(stats.spanCount > 0 ? (double) stats.errorCount / stats.spanCount : 0)
.build();
})
.sorted((a, b) -> Long.compare(b.getSpanCount(), a.getSpanCount()))
.limit(limit)
.collect(Collectors.toList());
}
public Map<String, List<DependencyLink>> getServiceDependencies(Instant start, Instant end) {
List<DependencyLink> dependencies = spanReader.getDependencies(start, end);
return dependencies.stream()
.collect(Collectors.groupingBy(DependencyLink::getParent));
}
public List<String> findSlowTraces(String serviceName, Duration threshold, Instant start, Instant end) {
TraceQuery query = TraceQuery.builder()
.serviceName(serviceName)
.startTimeMin(start)
.startTimeMax(end)
.durationMin(threshold)
.limit(100)
.build();
TraceSearchResult result = spanReader.findTraces(query);
return result.getTraces().stream()
.map(Trace::getTraceId)
.collect(Collectors.toList());
}
public List<String> findErrorTraces(String serviceName, Instant start, Instant end) {
TraceQuery query = TraceQuery.builder()
.serviceName(serviceName)
.startTimeMin(start)
.startTimeMax(end)
.tags(Collections.singletonMap("error", "true"))
.limit(100)
.build();
TraceSearchResult result = spanReader.findTraces(query);
return result.getTraces().stream()
.map(Trace::getTraceId)
.collect(Collectors.toList());
}
private Map<String, Object> analyzeServices(Trace trace) {
Map<String, Object> serviceAnalysis = new HashMap<>();
Map<String, ServiceStats> stats = new HashMap<>();
for (Span span : trace.getSpans()) {
String serviceName = span.getProcess().getServiceName();
ServiceStats serviceStats = stats.computeIfAbsent(serviceName, k -> new ServiceStats());
serviceStats.spanCount++;
serviceStats.totalDuration += span.getDuration();
if (span.hasError()) {
serviceStats.errorCount++;
}
}
stats.forEach((serviceName, serviceStats) -> {
Map<String, Object> serviceData = new HashMap<>();
serviceData.put("spanCount", serviceStats.spanCount);
serviceData.put("avgDurationMs", serviceStats.spanCount > 0 ? 
serviceStats.totalDuration / serviceStats.spanCount / 1000.0 : 0);
serviceData.put("errorCount", serviceStats.errorCount);
serviceData.put("errorRate", serviceStats.spanCount > 0 ? 
(double) serviceStats.errorCount / serviceStats.spanCount : 0);
serviceAnalysis.put(serviceName, serviceData);
});
return serviceAnalysis;
}
private Map<String, Object> analyzeErrors(Trace trace) {
Map<String, Object> errorAnalysis = new HashMap<>();
List<Map<String, Object>> errors = new ArrayList<>();
for (Span span : trace.getSpans()) {
if (span.hasError()) {
Map<String, Object> error = new HashMap<>();
error.put("service", span.getProcess().getServiceName());
error.put("operation", span.getOperationName());
error.put("spanId", span.getSpanId());
error.put("durationMs", span.getDuration() / 1000.0);
// Extract error details from tags
span.getTag("error.message").ifPresent(msg -> error.put("message", msg));
span.getTag("error.type").ifPresent(type -> error.put("type", type));
errors.add(error);
}
}
errorAnalysis.put("totalErrors", errors.size());
errorAnalysis.put("errors", errors);
errorAnalysis.put("servicesWithErrors", errors.stream()
.map(error -> (String) error.get("service"))
.distinct()
.collect(Collectors.toList()));
return errorAnalysis;
}
private Map<String, Object> analyzePerformance(Trace trace) {
Map<String, Object> performance = new HashMap<>();
if (trace.getSpans().isEmpty()) {
return performance;
}
// Find critical path
List<Span> criticalPath = findCriticalPath(trace);
performance.put("criticalPathLength", criticalPath.size());
performance.put("criticalPathDurationMs", criticalPath.stream()
.mapToLong(Span::getDuration)
.sum() / 1000.0);
// Bottleneck analysis
Optional<Span> bottleneck = trace.getSpans().stream()
.max(Comparator.comparingLong(Span::getDuration));
bottleneck.ifPresent(span -> {
Map<String, Object> bottleneckInfo = new HashMap<>();
bottleneckInfo.put("service", span.getProcess().getServiceName());
bottleneckInfo.put("operation", span.getOperationName());
bottleneckInfo.put("durationMs", span.getDuration() / 1000.0);
bottleneckInfo.put("percentage", (double) span.getDuration() / trace.getDuration().toMicros());
performance.put("bottleneck", bottleneckInfo);
});
return performance;
}
private List<Span> findCriticalPath(Trace trace) {
// Simplified critical path algorithm
// In production, you'd want a more sophisticated implementation
return trace.getSpans().stream()
.filter(span -> span.getDuration() > trace.getDuration().toMicros() * 0.1) // Spans > 10% of total
.sorted(Comparator.comparingLong(Span::getDuration).reversed())
.limit(5)
.collect(Collectors.toList());
}
// Helper classes for internal statistics
private static class ServiceStats {
int spanCount = 0;
long totalDuration = 0;
int errorCount = 0;
}
private static class ServiceOperationStats {
int spanCount = 0;
long totalDuration = 0;
int errorCount = 0;
}
}

Step 6: REST API Controllers

@RestController
@RequestMapping("/api/jaeger")
@Slf4j
public class JaegerStorageController {
@Autowired
private JaegerSpanReader spanReader;
@Autowired
private JaegerSpanWriter spanWriter;
@Autowired
private TraceAnalysisService analysisService;
@GetMapping("/traces/{traceId}")
public ResponseEntity<?> getTrace(@PathVariable String traceId) {
try {
Trace trace = spanReader.getTrace(traceId);
if (trace == null) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(trace);
} catch (Exception e) {
log.error("Failed to get trace: {}", traceId, e);
return ResponseEntity.badRequest().body(new ErrorResponse("TRACE_NOT_FOUND", e.getMessage()));
}
}
@PostMapping("/traces/search")
public ResponseEntity<?> searchTraces(@Valid @RequestBody TraceQuery query) {
try {
TraceSearchResult result = spanReader.findTraces(query);
return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("Failed to search traces", e);
return ResponseEntity.badRequest().body(new ErrorResponse("TRACE_SEARCH_ERROR", e.getMessage()));
}
}
@GetMapping("/traces/{traceId}/analysis")
public ResponseEntity<?> analyzeTrace(@PathVariable String traceId) {
try {
Map<String, Object> analysis = analysisService.analyzeTrace(traceId);
return ResponseEntity.ok(analysis);
} catch (Exception e) {
log.error("Failed to analyze trace: {}", traceId, e);
return ResponseEntity.badRequest().body(new ErrorResponse("TRACE_ANALYSIS_ERROR", e.getMessage()));
}
}
@GetMapping("/services")
public ResponseEntity<?> getServices() {
try {
List<String> services = spanReader.getServices();
return ResponseEntity.ok(services);
} catch (Exception e) {
log.error("Failed to get services", e);
return ResponseEntity.badRequest().body(new ErrorResponse("SERVICES_ERROR", e.getMessage()));
}
}
@GetMapping("/services/{serviceName}/operations")
public ResponseEntity<?> getServiceOperations(@PathVariable String serviceName) {
try {
List<String> operations = spanReader.getOperations(serviceName);
return ResponseEntity.ok(operations);
} catch (Exception e) {
log.error("Failed to get operations for service: {}", serviceName, e);
return ResponseEntity.badRequest().body(new ErrorResponse("OPERATIONS_ERROR", e.getMessage()));
}
}
@GetMapping("/services/{serviceName}/top-operations")
public ResponseEntity<?> getTopOperations(
@PathVariable String serviceName,
@RequestParam(defaultValue = "10") int limit,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant start,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant end) {
try {
if (start == null) start = Instant.now().minus(Duration.ofHours(1));
if (end == null) end = Instant.now();
List<ServiceOperation> operations = analysisService.getTopOperations(
serviceName, start, end, limit);
return ResponseEntity.ok(operations);
} catch (Exception e) {
log.error("Failed to get top operations for service: {}", serviceName, e);
return ResponseEntity.badRequest().body(new ErrorResponse("TOP_OPERATIONS_ERROR", e.getMessage()));
}
}
@GetMapping("/dependencies")
public ResponseEntity<?> getDependencies(
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant start,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant end) {
try {
if (start == null) start = Instant.now().minus(Duration.ofHours(24));
if (end == null) end = Instant.now();
Map<String, List<DependencyLink>> dependencies = analysisService.getServiceDependencies(start, end);
return ResponseEntity.ok(dependencies);
} catch (Exception e) {
log.error("Failed to get dependencies", e);
return ResponseEntity.badRequest().body(new ErrorResponse("DEPENDENCIES_ERROR", e.getMessage()));
}
}
@PostMapping("/spans")
public ResponseEntity<?> writeSpans(@Valid @RequestBody List<Span> spans) {
try {
WriteSpansResponse response = spanWriter.writeSpans(spans);
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("Failed to write spans", e);
return ResponseEntity.badRequest().body(new ErrorResponse("SPAN_WRITE_ERROR", e.getMessage()));
}
}
}
@Data
@AllArgsConstructor
class ErrorResponse {
private String errorCode;
private String message;
private LocalDateTime timestamp;
public ErrorResponse(String errorCode, String message) {
this.errorCode = errorCode;
this.message = message;
this.timestamp = LocalDateTime.now();
}
}

Step 7: Exception Handling

public class JaegerStorageException extends RuntimeException {
public JaegerStorageException(String message) {
super(message);
}
public JaegerStorageException(String message, Throwable cause) {
super(message, cause);
}
}
@ControllerAdvice
public class JaegerStorageExceptionHandler {
@ExceptionHandler(JaegerStorageException.class)
public ResponseEntity<ErrorResponse> handleJaegerStorageException(JaegerStorageException ex) {
ErrorResponse error = new ErrorResponse("JAEGER_STORAGE_ERROR", ex.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error);
}
@ExceptionHandler(StatusRuntimeException.class)
public ResponseEntity<ErrorResponse> handleGrpcException(StatusRuntimeException ex) {
ErrorResponse error = new ErrorResponse("GRPC_ERROR", ex.getStatus().getDescription());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error);
}
}

Step 8: Configuration File

application.yml:

jaeger:
storage:
grpc-host: "localhost"
grpc-port: 17271
grpc-tls-enabled: false
timeout-ms: 30000
max-retries: 3
max-concurrent-calls: 100
keep-alive: true
keep-alive-time-ms: 30000
max-traces-per-query: 100
trace-lookback: 24h
enable-trace-cache: true
trace-cache-ttl: 10m
backend: "GRPC_PLUGIN"
enable-metrics: true
metrics-collection-interval: 30s
spring:
cache:
type: caffeine
logging:
level:
com.yourcompany.jaeger: DEBUG

Step 9: Usage Examples

@Service
@Slf4j
public class JaegerExampleService {
@Autowired
private JaegerSpanWriter spanWriter;
@Autowired
private TraceAnalysisService analysisService;
public void demonstrateUsage() {
// Write spans to Jaeger
Span span = createExampleSpan();
spanWriter.writeSpans(Collections.singletonList(span));
// Analyze recent traces for a service
Instant end = Instant.now();
Instant start = end.minus(Duration.ofHours(1));
List<ServiceOperation> topOperations = analysisService.getTopOperations(
"order-service", start, end, 10);
topOperations.forEach(op -> 
log.info("Operation: {}, Count: {}, Avg Duration: {:.2f}ms", 
op.getOperation(), op.getSpanCount(), op.getAvgDuration())
);
// Find slow traces
List<String> slowTraces = analysisService.findSlowTraces(
"order-service", Duration.ofMillis(1000), start, end);
log.info("Found {} slow traces", slowTraces.size());
}
private Span createExampleSpan() {
return Span.builder()
.traceId("1a2b3c4d5e6f7g8h")
.spanId("9i0j1k2l3m4n5o6p")
.operationName("processOrder")
.startTime(System.currentTimeMillis() * 1000)
.duration(150000) // 150ms in microseconds
.tags(Map.of(
"http.method", "POST",
"http.status_code", "200",
"component", "java-spring"
))
.process(SpanProcess.builder()
.serviceName("order-service")
.tags(Map.of(
"version", "1.0.0",
"environment", "production"
))
.build())
.build();
}
}

Key Features Implemented

  1. gRPC Storage Integration: Direct communication with Jaeger storage plugin
  2. Trace Writing: Efficient span writing with retry mechanisms
  3. Trace Querying: Flexible trace search and retrieval
  4. Trace Analysis: Comprehensive trace analysis and performance insights
  5. Dependency Analysis: Service dependency mapping
  6. Error Tracking: Error detection and analysis across traces
  7. Performance Monitoring: Bottleneck identification and critical path analysis

Best Practices

  1. Connection Management: Proper gRPC channel lifecycle management
  2. Error Handling: Comprehensive error handling with retry mechanisms
  3. Caching: Strategic caching of frequently accessed traces
  4. Monitoring: Metrics collection for performance monitoring
  5. Resource Cleanup: Proper shutdown of gRPC channels and connections
  6. Batch Operations: Efficient batch writing of spans
  7. Async Operations: Non-blocking operations for better performance

This comprehensive Jaeger gRPC storage integration provides a robust foundation for building distributed tracing solutions in Java, enabling efficient trace storage, querying, and advanced analysis of microservices performance.

Java Observability, Logging Intelligence & AI-Driven Monitoring (APM, Tracing, Logs & Anomaly Detection)

https://macronepal.com/blog/beyond-metrics-observing-serverless-and-traditional-java-applications-with-thundra-apm/
Explains using Thundra APM to observe both serverless and traditional Java applications by combining tracing, metrics, and logs into a unified observability platform for faster debugging and performance insights.

https://macronepal.com/blog/dynatrace-oneagent-in-java-2/
Explains Dynatrace OneAgent for Java, which automatically instruments JVM applications to capture metrics, traces, and logs, enabling full-stack monitoring and root-cause analysis with minimal configuration.

https://macronepal.com/blog/lightstep-java-sdk-distributed-tracing-and-observability-implementation/
Explains Lightstep Java SDK for distributed tracing, helping developers track requests across microservices and identify latency issues using OpenTelemetry-based observability.

https://macronepal.com/blog/honeycomb-io-beeline-for-java-complete-guide-2/
Explains Honeycomb Beeline for Java, which provides high-cardinality observability and deep query capabilities to understand complex system behavior and debug distributed systems efficiently.

https://macronepal.com/blog/lumigo-for-serverless-in-java-complete-distributed-tracing-guide-2/
Explains Lumigo for Java serverless applications, offering automatic distributed tracing, log correlation, and error tracking to simplify debugging in cloud-native environments. (Lumigo Docs)

https://macronepal.com/blog/from-noise-to-signals-implementing-log-anomaly-detection-in-java-applications/
Explains how to detect anomalies in Java logs using behavioral patterns and machine learning techniques to separate meaningful incidents from noisy log data and improve incident response.

https://macronepal.com/blog/ai-powered-log-analysis-in-java-from-reactive-debugging-to-proactive-insights/
Explains AI-driven log analysis for Java applications, shifting from manual debugging to predictive insights that identify issues early and improve system reliability using intelligent log processing.

https://macronepal.com/blog/titliel-java-logging-best-practices/
Explains best practices for Java logging, focusing on structured logs, proper log levels, performance optimization, and ensuring logs are useful for debugging and observability systems.

https://macronepal.com/blog/seeking-a-loguru-for-java-the-quest-for-elegant-and-simple-logging/
Explains the search for simpler, more elegant logging frameworks in Java, comparing modern logging approaches that aim to reduce complexity while improving readability and developer experience.

One thought on “Jaeger gRPC Storage Integration in Java

  1. Pingback: URL

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper