Distributed Tracing with Apache Cassandra: Java Implementation Guide


Article

Apache Cassandra's distributed nature, high write throughput, and time-series capabilities make it an excellent choice for storing distributed tracing data. This guide covers how to implement a tracing backend using Cassandra in Java, from data modeling to query optimization.

Why Cassandra for Tracing?

  • High Write Performance: Perfect for trace data ingestion
  • Horizontal Scalability: Handles growing trace volumes
  • Time-series Native: Traces are naturally time-ordered data
  • TTL Support: Automatic expiration of old trace data
  • Distributed Architecture: Aligns with microservices patterns

Project Setup and Dependencies

Maven Dependencies:

<properties>
<cassandra.version>4.17.0</cassandra.version>
<opentelemetry.version>1.32.0</opentelemetry.version>
<jackson.version>2.15.2</jackson.version>
</properties>
<dependencies>
<!-- Cassandra Java Driver -->
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>${cassandra.version}</version>
</dependency>
<!-- OpenTelemetry API -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>

1. Cassandra Data Model for Tracing

Schema Design:

-- Keyspace for tracing data
CREATE KEYSPACE IF NOT EXISTS tracing 
WITH replication = {
'class': 'NetworkTopologyStrategy', 
'datacenter1': 3
};
-- Table for storing trace spans
CREATE TABLE tracing.spans (
trace_id uuid,
span_id uuid,
parent_span_id uuid,
service_name text,
operation_name text,
start_time timestamp,
duration bigint,
tags map<text, text>,
logs list<frozen<map<text, text>>>,
-- Composite partition key for time-based partitioning
day_bucket text,
PRIMARY KEY ((day_bucket, service_name), start_time, trace_id, span_id)
) WITH CLUSTERING ORDER BY (start_time DESC);
-- Index for trace_id lookups
CREATE INDEX IF NOT EXISTS idx_spans_trace_id ON tracing.spans (trace_id);
-- Table for service dependencies
CREATE TABLE tracing.service_dependencies (
source_service text,
target_service text,
day_bucket text,
call_count counter,
error_count counter,
avg_duration counter,
PRIMARY KEY ((day_bucket, source_service), target_service)
);
-- Table for trace sampling
CREATE TABLE tracing.trace_samples (
trace_id uuid,
sampled boolean,
reason text,
PRIMARY KEY (trace_id)
);

2. Cassandra Tracing Repository

Core Span Repository:

package com.example.tracing.cassandra;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Instant;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.*;
public class CassandraSpanRepository {
private final CqlSession session;
private final ObjectMapper objectMapper;
private final PreparedStatement insertSpanStatement;
private final PreparedStatement findSpansByTraceIdStatement;
private final PreparedStatement findSpansByServiceStatement;
public CassandraSpanRepository(CqlSession session) {
this.session = session;
this.objectMapper = new ObjectMapper();
// Prepare statements for better performance
this.insertSpanStatement = session.prepare(
"INSERT INTO tracing.spans (trace_id, span_id, parent_span_id, service_name, " +
"operation_name, start_time, duration, tags, logs, day_bucket) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
);
this.findSpansByTraceIdStatement = session.prepare(
"SELECT * FROM tracing.spans WHERE trace_id = ?"
);
this.findSpansByServiceStatement = session.prepare(
"SELECT * FROM tracing.spans WHERE day_bucket = ? AND service_name = ? " +
"AND start_time >= ? AND start_time <= ?"
);
}
public void storeSpan(Span span) {
String dayBucket = getDayBucket(span.getStartTime());
BoundStatement boundStatement = insertSpanStatement.bind()
.setUuid("trace_id", span.getTraceId())
.setUuid("span_id", span.getSpanId())
.setUuid("parent_span_id", span.getParentSpanId())
.setString("service_name", span.getServiceName())
.setString("operation_name", span.getOperationName())
.setInstant("start_time", span.getStartTime())
.setLong("duration", span.getDuration())
.setMap("tags", span.getTags(), String.class, String.class)
.setList("logs", convertLogsToMapList(span.getLogs()), Map.class)
.setString("day_bucket", dayBucket);
session.execute(boundStatement);
}
public List<Span> findSpansByTraceId(UUID traceId) {
BoundStatement boundStatement = findSpansByTraceIdStatement.bind(traceId);
ResultSet resultSet = session.execute(boundStatement);
List<Span> spans = new ArrayList<>();
for (Row row : resultSet) {
spans.add(mapRowToSpan(row));
}
// Sort by start time for proper trace reconstruction
spans.sort(Comparator.comparing(Span::getStartTime));
return spans;
}
public List<Span> findSpansByService(String serviceName, Instant startTime, Instant endTime) {
String dayBucket = getDayBucket(startTime);
BoundStatement boundStatement = findSpansByServiceStatement.bind()
.setString("day_bucket", dayBucket)
.setString("service_name", serviceName)
.setInstant("start_time", startTime)
.setInstant("start_time", endTime);
ResultSet resultSet = session.execute(boundStatement);
List<Span> spans = new ArrayList<>();
for (Row row : resultSet) {
spans.add(mapRowToSpan(row));
}
return spans;
}
private Span mapRowToSpan(Row row) {
return new Span(
row.getUuid("trace_id"),
row.getUuid("span_id"),
row.getUuid("parent_span_id"),
row.getString("service_name"),
row.getString("operation_name"),
row.getInstant("start_time"),
row.getLong("duration"),
row.getMap("tags", String.class, String.class),
convertMapListToLogs(row.getList("logs", Map.class))
);
}
private String getDayBucket(Instant timestamp) {
return LocalDate.ofInstant(timestamp, java.time.ZoneId.of("UTC"))
.format(DateTimeFormatter.ISO_DATE);
}
private List<Map<String, String>> convertLogsToMapList(List<SpanLog> logs) {
List<Map<String, String>> result = new ArrayList<>();
for (SpanLog log : logs) {
Map<String, String> logMap = new HashMap<>();
logMap.put("timestamp", log.getTimestamp().toString());
logMap.put("message", log.getMessage());
logMap.putAll(log.getFields());
result.add(logMap);
}
return result;
}
private List<SpanLog> convertMapListToLogs(List<Map<String, String>> mapList) {
List<SpanLog> logs = new ArrayList<>();
for (Map<String, String> logMap : mapList) {
Instant timestamp = Instant.parse(logMap.remove("timestamp"));
String message = logMap.remove("message");
logs.add(new SpanLog(timestamp, message, logMap));
}
return logs;
}
}

3. Domain Models

Span and Related Classes:

package com.example.tracing.model;
import java.time.Instant;
import java.util.*;
public class Span {
private final UUID traceId;
private final UUID spanId;
private final UUID parentSpanId;
private final String serviceName;
private final String operationName;
private final Instant startTime;
private final long duration; // nanoseconds
private final Map<String, String> tags;
private final List<SpanLog> logs;
public Span(UUID traceId, UUID spanId, UUID parentSpanId, String serviceName, 
String operationName, Instant startTime, long duration, 
Map<String, String> tags, List<SpanLog> logs) {
this.traceId = traceId;
this.spanId = spanId;
this.parentSpanId = parentSpanId;
this.serviceName = serviceName;
this.operationName = operationName;
this.startTime = startTime;
this.duration = duration;
this.tags = tags != null ? new HashMap<>(tags) : new HashMap<>();
this.logs = logs != null ? new ArrayList<>(logs) : new ArrayList<>();
}
// Getters
public UUID getTraceId() { return traceId; }
public UUID getSpanId() { return spanId; }
public UUID getParentSpanId() { return parentSpanId; }
public String getServiceName() { return serviceName; }
public String getOperationName() { return operationName; }
public Instant getStartTime() { return startTime; }
public long getDuration() { return duration; }
public Map<String, String> getTags() { return Collections.unmodifiableMap(tags); }
public List<SpanLog> getLogs() { return Collections.unmodifiableList(logs); }
public void addTag(String key, String value) {
tags.put(key, value);
}
public void addLog(SpanLog log) {
logs.add(log);
}
public boolean hasError() {
return "error".equals(tags.get("span.status")) || 
Boolean.parseBoolean(tags.get("error"));
}
}
class SpanLog {
private final Instant timestamp;
private final String message;
private final Map<String, String> fields;
public SpanLog(Instant timestamp, String message, Map<String, String> fields) {
this.timestamp = timestamp;
this.message = message;
this.fields = fields != null ? new HashMap<>(fields) : new HashMap<>();
}
// Getters
public Instant getTimestamp() { return timestamp; }
public String getMessage() { return message; }
public Map<String, String> getFields() { return Collections.unmodifiableMap(fields); }
public void addField(String key, String value) {
fields.put(key, value);
}
}

4. OpenTelemetry Cassandra Exporter

Custom Span Exporter:

package com.example.tracing.export;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import com.example.tracing.cassandra.CassandraSpanRepository;
import com.example.tracing.model.Span;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class CassandraSpanExporter implements SpanExporter {
private final CassandraSpanRepository repository;
private final BlockingQueue<Span> batchQueue;
private final int batchSize;
private final long exportIntervalMs;
private final Thread exportThread;
private volatile boolean running = true;
public CassandraSpanExporter(CassandraSpanRepository repository, int batchSize, long exportIntervalMs) {
this.repository = repository;
this.batchSize = batchSize;
this.exportIntervalMs = exportIntervalMs;
this.batchQueue = new LinkedBlockingQueue<>(10000); // Backpressure
this.exportThread = new Thread(this::exportLoop, "cassandra-span-exporter");
this.exportThread.setDaemon(true);
this.exportThread.start();
}
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
for (SpanData spanData : spans) {
if (!running) {
break;
}
Span span = convertToDomainSpan(spanData);
try {
// Non-blocking offer, if queue is full, spans will be dropped
batchQueue.offer(span, 100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CompletableResultCode.ofFailure();
}
}
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode flush() {
// Force export of current batch
exportBatch();
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode shutdown() {
running = false;
exportThread.interrupt();
try {
exportThread.join(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return flush();
}
private void exportLoop() {
List<Span> batch = new ArrayList<>(batchSize);
while (running) {
try {
// Wait for first span with timeout
Span firstSpan = batchQueue.poll(exportIntervalMs, TimeUnit.MILLISECONDS);
if (firstSpan != null) {
batch.add(firstSpan);
// Drain remaining spans up to batch size
batchQueue.drainTo(batch, batchSize - 1);
// Export batch
exportBatch(batch);
batch.clear();
} else {
// Export any remaining spans after timeout
if (!batch.isEmpty()) {
exportBatch(batch);
batch.clear();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
// Log error but continue
System.err.println("Error exporting spans to Cassandra: " + e.getMessage());
}
}
}
private void exportBatch() {
List<Span> batch = new ArrayList<>();
batchQueue.drainTo(batch, batchSize);
exportBatch(batch);
}
private void exportBatch(List<Span> batch) {
if (batch.isEmpty()) {
return;
}
for (Span span : batch) {
try {
repository.storeSpan(span);
} catch (Exception e) {
System.err.println("Failed to store span: " + e.getMessage());
// Implement retry logic or dead letter queue in production
}
}
}
private Span convertToDomainSpan(SpanData spanData) {
Map<String, String> tags = new HashMap<>();
spanData.getAttributes().forEach((key, value) -> 
tags.put(key.getKey(), value.toString()));
// Add standard OpenTelemetry attributes
tags.put("span.kind", spanData.getKind().name());
tags.put("span.status", spanData.getStatus().getStatusCode().name());
if (spanData.getStatus().getDescription() != null) {
tags.put("status.description", spanData.getStatus().getDescription());
}
List<SpanLog> logs = new ArrayList<>();
spanData.getEvents().forEach(event -> {
Map<String, String> logFields = new HashMap<>();
event.getAttributes().forEach((key, value) -> 
logFields.put(key.getKey(), value.toString()));
logs.add(new SpanLog(event.getEpochNanos(), event.getName(), logFields));
});
return new Span(
UUID.nameUUIDFromBytes(spanData.getTraceId().toByteArray()),
UUID.nameUUIDFromBytes(spanData.getSpanId().toByteArray()),
spanData.getParentSpanContext().isValid() ? 
UUID.nameUUIDFromBytes(spanData.getParentSpanContext().getSpanId().toByteArray()) : null,
spanData.getResource().getAttributes().get(io.opentelemetry.semresource.ResourceAttributes.SERVICE_NAME),
spanData.getName(),
Instant.ofEpochMilli(spanData.getStartEpochNanos() / 1_000_000),
spanData.getEndEpochNanos() - spanData.getStartEpochNanos(),
tags,
logs
);
}
}

5. Trace Analysis and Query Service

Trace Analysis Service:

package com.example.tracing.service;
import com.example.tracing.cassandra.CassandraSpanRepository;
import com.example.tracing.model.Span;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.stream.Collectors;
public class TraceAnalysisService {
private final CassandraSpanRepository repository;
public TraceAnalysisService(CassandraSpanRepository repository) {
this.repository = repository;
}
public TraceTree getTraceTree(UUID traceId) {
List<Span> spans = repository.findSpansByTraceId(traceId);
return buildTraceTree(spans);
}
public List<TraceSummary> findTraces(String serviceName, Instant from, Instant to, int limit) {
List<Span> spans = repository.findSpansByService(serviceName, from, to);
// Group by trace ID and create summaries
Map<UUID, List<Span>> traces = spans.stream()
.collect(Collectors.groupingBy(Span::getTraceId));
return traces.entrySet().stream()
.map(entry -> createTraceSummary(entry.getKey(), entry.getValue()))
.sorted(Comparator.comparing(TraceSummary::getStartTime).reversed())
.limit(limit)
.collect(Collectors.toList());
}
public ServiceMap generateServiceMap(Instant from, Instant to) {
// This would query the service_dependencies table
// For simplicity, we'll generate from span data
List<Span> spans = repository.findSpansByService("%", from, to);
Map<String, ServiceNode> nodes = new HashMap<>();
Map<String, Map<String, ServiceEdge>> edges = new HashMap<>();
for (Span span : spans) {
String serviceName = span.getServiceName();
nodes.putIfAbsent(serviceName, new ServiceNode(serviceName));
// Extract target service from tags or peer.service attribute
String targetService = span.getTags().get("peer.service");
if (targetService != null && !targetService.equals(serviceName)) {
edges.putIfAbsent(serviceName, new HashMap<>());
ServiceEdge edge = edges.get(serviceName).computeIfAbsent(targetService, 
k -> new ServiceEdge(serviceName, targetService));
edge.incrementCallCount();
if (span.hasError()) {
edge.incrementErrorCount();
}
}
}
return new ServiceMap(new ArrayList<>(nodes.values()), 
edges.values().stream()
.flatMap(m -> m.values().stream())
.collect(Collectors.toList()));
}
private TraceSummary createTraceSummary(UUID traceId, List<Span> spans) {
Instant startTime = spans.stream()
.map(Span::getStartTime)
.min(Instant::compareTo)
.orElse(Instant.now());
long duration = spans.stream()
.mapToLong(Span::getDuration)
.max()
.orElse(0);
boolean hasErrors = spans.stream().anyMatch(Span::hasError);
Set<String> services = spans.stream()
.map(Span::getServiceName)
.collect(Collectors.toSet());
return new TraceSummary(traceId, startTime, duration, hasErrors, services, spans.size());
}
private TraceTree buildTraceTree(List<Span> spans) {
Map<UUID, TraceTreeNode> nodeMap = new HashMap<>();
TraceTreeNode root = null;
// Create nodes
for (Span span : spans) {
TraceTreeNode node = new TraceTreeNode(span);
nodeMap.put(span.getSpanId(), node);
if (span.getParentSpanId() == null) {
root = node;
}
}
// Build tree
for (Span span : spans) {
if (span.getParentSpanId() != null) {
TraceTreeNode parent = nodeMap.get(span.getParentSpanId());
if (parent != null) {
parent.addChild(nodeMap.get(span.getSpanId()));
}
}
}
return new TraceTree(root);
}
}
// Supporting classes
class TraceSummary {
private final UUID traceId;
private final Instant startTime;
private final long duration;
private final boolean hasErrors;
private final Set<String> services;
private final int spanCount;
// Constructor, getters...
}
class TraceTree {
private final TraceTreeNode root;
// Constructor, getters...
}
class TraceTreeNode {
private final Span span;
private final List<TraceTreeNode> children = new ArrayList<>();
// Constructor, getters, addChild...
}
class ServiceMap {
private final List<ServiceNode> nodes;
private final List<ServiceEdge> edges;
// Constructor, getters...
}
class ServiceNode {
private final String serviceName;
// Constructor, getters...
}
class ServiceEdge {
private final String sourceService;
private final String targetService;
private long callCount;
private long errorCount;
// Constructor, getters, increment methods...
}

6. Spring Boot Configuration

Cassandra Configuration:

package com.example.tracing.config;
import com.datastax.oss.driver.api.core.CqlSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CassandraConfig {
@Value("${cassandra.contact-points:localhost}")
private String contactPoints;
@Value("${cassandra.port:9042}")
private int port;
@Value("${cassandra.keyspace:tracing}")
private String keyspace;
@Value("${cassandra.local-datacenter:datacenter1}")
private String localDatacenter;
@Bean
public CqlSession cqlSession() {
return CqlSession.builder()
.addContactPoint(new InetSocketAddress(contactPoints, port))
.withLocalDatacenter(localDatacenter)
.withKeyspace(keyspace)
.build();
}
@Bean
public CassandraSpanRepository spanRepository(CqlSession session) {
return new CassandraSpanRepository(session);
}
@Bean
public TraceAnalysisService traceAnalysisService(CassandraSpanRepository repository) {
return new TraceAnalysisService(repository);
}
}

OpenTelemetry Configuration:

@Configuration
public class TracingConfig {
@Bean
public CassandraSpanExporter spanExporter(CassandraSpanRepository repository) {
return new CassandraSpanExporter(repository, 100, 5000); // batch 100, 5s interval
}
@Bean 
public SpanExporter loggingSpanExporter() {
// Fallback exporter
return new SpanExporter() {
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
spans.forEach(span -> 
System.out.println("Exporting span: " + span.getName()));
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
};
}
}

7. Performance Optimization

Batch Writing and Async Operations:

@Component
public class OptimizedSpanWriter {
private final CassandraSpanRepository repository;
private final ExecutorService executorService;
public OptimizedSpanWriter(CassandraSpanRepository repository) {
this.repository = repository;
this.executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setNameFormat("cassandra-writer-%d").build()
);
}
@Async
public CompletableFuture<Void> storeSpanAsync(Span span) {
return CompletableFuture.runAsync(() -> {
try {
repository.storeSpan(span);
} catch (Exception e) {
// Handle exception, maybe store in dead letter queue
System.err.println("Failed to store span asynchronously: " + e.getMessage());
}
}, executorService);
}
public void storeSpansInBatch(List<Span> spans) {
// Use Cassandra batch statements for multiple spans from same trace
spans.forEach(this::storeSpanAsync);
}
}

8. Best Practices

Data Retention and TTL:

-- Add TTL to spans table
ALTER TABLE tracing.spans 
WITH default_time_to_live = 2592000; -- 30 days
-- Create a separate table for long-term trace summaries
CREATE TABLE tracing.trace_summaries (
trace_id uuid,
start_time timestamp,
duration bigint,
service_count int,
has_errors boolean,
PRIMARY KEY ((day_bucket), start_time, trace_id)
) WITH default_time_to_live = 7776000; -- 90 days

Monitoring and Metrics:

@Component
public class TracingMetrics {
private final MeterRegistry meterRegistry;
private final Counter spansStored;
private final Counter spanStorageErrors;
private final Timer spanStorageTimer;
public TracingMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.spansStored = Counter.builder("tracing.spans.stored")
.register(meterRegistry);
this.spanStorageErrors = Counter.builder("tracing.spans.storage.errors")
.register(meterRegistry);
this.spanStorageTimer = Timer.builder("tracing.spans.storage.duration")
.register(meterRegistry);
}
public void recordSpanStorage(long duration, boolean success) {
spanStorageTimer.record(duration, TimeUnit.MILLISECONDS);
if (success) {
spansStored.increment();
} else {
spanStorageErrors.increment();
}
}
}

Conclusion

Cassandra provides an excellent foundation for distributed tracing systems due to its write performance, scalability, and time-series capabilities. Key takeaways:

  1. Design for time-series: Use day-based partitioning for efficient querying
  2. Batch writes: Use asynchronous batching for better throughput
  3. Monitor performance: Track storage metrics and query performance
  4. Plan for retention: Use TTL to automatically manage data lifecycle
  5. Leverage OpenTelemetry: Integrate with standard tracing APIs

This implementation can handle high-volume tracing data while providing efficient query capabilities for trace analysis and debugging.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper