A comprehensive observability implementation for Java microservices in service mesh environments, providing distributed tracing, metrics collection, logging correlation, and service mesh integration.
Complete Implementation
1. Core Observability Configuration
package com.observability.mesh;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.semconv.ResourceAttributes;
import java.util.*;
import java.util.concurrent.*;
/**
* Main observability manager for service mesh environments
*/
public class ServiceMeshObservability {
private static final String INSTRUMENTATION_NAME = "service-mesh-observability";
private final OpenTelemetry openTelemetry;
private final Tracer tracer;
private final MetricsCollector metricsCollector;
private final DistributedLogger distributedLogger;
private final ServiceMeshIntegrator meshIntegrator;
private final ObservabilityConfig config;
private static volatile ServiceMeshObservability instance;
private ServiceMeshObservability(ObservabilityConfig config) {
this.config = config;
this.openTelemetry = initializeOpenTelemetry(config);
this.tracer = openTelemetry.getTracer(INSTRUMENTATION_NAME, "1.0.0");
this.metricsCollector = new MetricsCollector(openTelemetry, config);
this.distributedLogger = new DistributedLogger(openTelemetry);
this.meshIntegrator = new ServiceMeshIntegrator(config);
initializeServiceMeshIntegration();
}
public static ServiceMeshObservability initialize(ObservabilityConfig config) {
if (instance == null) {
synchronized (ServiceMeshObservability.class) {
if (instance == null) {
instance = new ServiceMeshObservability(config);
}
}
}
return instance;
}
public static ServiceMeshObservability getInstance() {
if (instance == null) {
throw new IllegalStateException("ServiceMeshObservability not initialized. Call initialize() first.");
}
return instance;
}
private OpenTelemetry initializeOpenTelemetry(ObservabilityConfig config) {
// Create resource with service attributes
Resource resource = Resource.getDefault()
.merge(Resource.create(Attributes.builder()
.put(ResourceAttributes.SERVICE_NAME, config.getServiceName())
.put(ResourceAttributes.SERVICE_VERSION, config.getServiceVersion())
.put(ResourceAttributes.DEPLOYMENT_ENVIRONMENT, config.getEnvironment())
.put("service.mesh", config.getMeshName())
.put("service.cluster", config.getClusterName())
.build()));
// Configure span exporter
OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder()
.setEndpoint(config.getOtlpEndpoint())
.setTimeout(config.getExportTimeout())
.build();
// Configure tracer provider
SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
.setResource(resource)
.addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build())
.build();
// Configure metrics exporter
PrometheusHttpServer prometheusExporter = PrometheusHttpServer.builder()
.setPort(config.getMetricsPort())
.build();
// Build OpenTelemetry instance
return OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.setPropagators(ContextPropagators.create(TextMapPropagator.composite(
W3CTraceContextPropagator.getInstance(),
W3CBaggagePropagator.getInstance()
)))
.buildAndRegisterGlobal();
}
private void initializeServiceMeshIntegration() {
meshIntegrator.initialize();
metricsCollector.registerMeshMetrics();
// Register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
/**
* Create a new span for a service operation
*/
public ObservabilityScope startOperation(String operationName, Map<String, String> attributes) {
Span span = tracer.spanBuilder(operationName)
.setSpanKind(SpanKind.SERVER)
.setAttribute("service.mesh.operation", operationName)
.setAllAttributes(Attributes.builder()
.putAll(convertToAttributes(attributes))
.build())
.startSpan();
Context context = Context.current().with(span);
return new ObservabilityScope(span, context, metricsCollector, distributedLogger, operationName);
}
/**
* Create a client span for outbound calls
*/
public ObservabilityScope startClientCall(String serviceName, String operation, Map<String, String> headers) {
Span span = tracer.spanBuilder(serviceName + "." + operation)
.setSpanKind(SpanKind.CLIENT)
.setAttribute("peer.service", serviceName)
.setAttribute("rpc.method", operation)
.setAttribute("service.mesh.direction", "outbound")
.startSpan();
// Inject headers for propagation
Map<String, String> propagationHeaders = new HashMap<>();
TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator();
propagator.inject(Context.current().with(span), propagationHeaders, Map::put);
if (headers != null) {
propagationHeaders.putAll(headers);
}
Context context = Context.current().with(span);
return new ObservabilityScope(span, context, metricsCollector, distributedLogger, operation)
.withPropagationHeaders(propagationHeaders);
}
/**
* Extract context from incoming headers
*/
public Context extractContext(Map<String, String> headers) {
TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator();
return propagator.extract(Context.current(), headers, (carrier, key) -> carrier.get(key.toLowerCase()));
}
/**
* Record business metrics
*/
public void recordBusinessMetric(String metricName, double value, Map<String, String> dimensions) {
metricsCollector.recordBusinessMetric(metricName, value, dimensions);
}
/**
* Record mesh-specific metrics
*/
public void recordMeshMetric(String metricName, double value, Map<String, String> meshDimensions) {
metricsCollector.recordMeshMetric(metricName, value, meshDimensions);
}
/**
* Log with distributed context
*/
public void logWithContext(String level, String message, Map<String, Object> fields) {
distributedLogger.log(level, message, fields);
}
/**
* Get service mesh insights
*/
public ServiceMeshInsights getMeshInsights() {
return meshIntegrator.collectInsights();
}
/**
* Check service mesh health
*/
public ServiceMeshHealth getMeshHealth() {
return meshIntegrator.checkHealth();
}
private Attributes convertToAttributes(Map<String, String> map) {
if (map == null || map.isEmpty()) {
return Attributes.empty();
}
Attributes.Builder builder = Attributes.builder();
for (Map.Entry<String, String> entry : map.entrySet()) {
builder.put(entry.getKey(), entry.getValue());
}
return builder.build();
}
private void shutdown() {
metricsCollector.shutdown();
meshIntegrator.shutdown();
System.info("Service mesh observability shutdown completed");
}
// Getters
public OpenTelemetry getOpenTelemetry() { return openTelemetry; }
public Tracer getTracer() { return tracer; }
public MetricsCollector getMetricsCollector() { return metricsCollector; }
public DistributedLogger getDistributedLogger() { return distributedLogger; }
public ServiceMeshIntegrator getMeshIntegrator() { return meshIntegrator; }
}
2. Observability Configuration
/**
* Observability configuration
*/
public class ObservabilityConfig {
private final String serviceName;
private final String serviceVersion;
private final String environment;
private final String meshName;
private final String clusterName;
private final String otlpEndpoint;
private final int metricsPort;
private final Duration exportTimeout;
private final boolean enableTracing;
private final boolean enableMetrics;
private final boolean enableLogging;
private final Map<String, String> customAttributes;
private ObservabilityConfig(Builder builder) {
this.serviceName = builder.serviceName;
this.serviceVersion = builder.serviceVersion;
this.environment = builder.environment;
this.meshName = builder.meshName;
this.clusterName = builder.clusterName;
this.otlpEndpoint = builder.otlpEndpoint;
this.metricsPort = builder.metricsPort;
this.exportTimeout = builder.exportTimeout;
this.enableTracing = builder.enableTracing;
this.enableMetrics = builder.enableMetrics;
this.enableLogging = builder.enableLogging;
this.customAttributes = builder.customAttributes;
}
// Getters
public String getServiceName() { return serviceName; }
public String getServiceVersion() { return serviceVersion; }
public String getEnvironment() { return environment; }
public String getMeshName() { return meshName; }
public String getClusterName() { return clusterName; }
public String getOtlpEndpoint() { return otlpEndpoint; }
public int getMetricsPort() { return metricsPort; }
public Duration getExportTimeout() { return exportTimeout; }
public boolean isEnableTracing() { return enableTracing; }
public boolean isEnableMetrics() { return enableMetrics; }
public boolean isEnableLogging() { return enableLogging; }
public Map<String, String> getCustomAttributes() { return customAttributes; }
public static Builder builder() {
return new Builder();
}
public static class Builder {
private String serviceName = "unknown-service";
private String serviceVersion = "1.0.0";
private String environment = "development";
private String meshName = "istio";
private String clusterName = "default";
private String otlpEndpoint = "http://localhost:4317";
private int metricsPort = 9464;
private Duration exportTimeout = Duration.ofSeconds(30);
private boolean enableTracing = true;
private boolean enableMetrics = true;
private boolean enableLogging = true;
private Map<String, String> customAttributes = new HashMap<>();
public Builder serviceName(String serviceName) {
this.serviceName = serviceName;
return this;
}
public Builder serviceVersion(String serviceVersion) {
this.serviceVersion = serviceVersion;
return this;
}
public Builder environment(String environment) {
this.environment = environment;
return this;
}
public Builder meshName(String meshName) {
this.meshName = meshName;
return this;
}
public Builder clusterName(String clusterName) {
this.clusterName = clusterName;
return this;
}
public Builder otlpEndpoint(String otlpEndpoint) {
this.otlpEndpoint = otlpEndpoint;
return this;
}
public Builder metricsPort(int metricsPort) {
this.metricsPort = metricsPort;
return this;
}
public Builder exportTimeout(Duration exportTimeout) {
this.exportTimeout = exportTimeout;
return this;
}
public Builder enableTracing(boolean enableTracing) {
this.enableTracing = enableTracing;
return this;
}
public Builder enableMetrics(boolean enableMetrics) {
this.enableMetrics = enableMetrics;
return this;
}
public Builder enableLogging(boolean enableLogging) {
this.enableLogging = enableLogging;
return this;
}
public Builder customAttributes(Map<String, String> customAttributes) {
this.customAttributes = customAttributes;
return this;
}
public Builder customAttribute(String key, String value) {
this.customAttributes.put(key, value);
return this;
}
public ObservabilityConfig build() {
return new ObservabilityConfig(this);
}
}
}
/**
* Observability scope for managing traces and context
*/
public class ObservabilityScope implements AutoCloseable {
private final Span span;
private final Context context;
private final MetricsCollector metricsCollector;
private final DistributedLogger logger;
private final String operationName;
private final long startTime;
private Map<String, String> propagationHeaders;
private boolean completed = false;
public ObservabilityScope(Span span, Context context, MetricsCollector metricsCollector,
DistributedLogger logger, String operationName) {
this.span = span;
this.context = context;
this.metricsCollector = metricsCollector;
this.logger = logger;
this.operationName = operationName;
this.startTime = System.currentTimeMillis();
this.propagationHeaders = new HashMap<>();
// Make this context current
io.opentelemetry.context.Scope scope = context.makeCurrent();
// Scope should be closed by the caller when they are done with the context
}
public ObservabilityScope withPropagationHeaders(Map<String, String> headers) {
this.propagationHeaders = headers;
return this;
}
public Map<String, String> getPropagationHeaders() {
return new HashMap<>(propagationHeaders);
}
public void addEvent(String name, Map<String, String> attributes) {
span.addEvent(name, convertToAttributes(attributes));
}
public void setAttribute(String key, String value) {
span.setAttribute(key, value);
}
public void setAttributes(Map<String, String> attributes) {
for (Map.Entry<String, String> entry : attributes.entrySet()) {
span.setAttribute(entry.getKey(), entry.getValue());
}
}
public void recordMetric(String metricName, double value, Map<String, String> dimensions) {
metricsCollector.recordBusinessMetric(metricName, value, dimensions);
}
public void log(String level, String message, Map<String, Object> fields) {
logger.log(level, message, fields);
}
public void success() {
if (!completed) {
span.setAttribute("success", true);
recordLatencyMetric(true);
completed = true;
}
}
public void error(Throwable throwable) {
if (!completed) {
span.recordException(throwable);
span.setAttribute("error", true);
span.setAttribute("error.type", throwable.getClass().getSimpleName());
span.setAttribute("error.message", throwable.getMessage());
recordLatencyMetric(false);
completed = true;
}
}
public void error(String errorCode, String errorMessage) {
if (!completed) {
span.setAttribute("error", true);
span.setAttribute("error.code", errorCode);
span.setAttribute("error.message", errorMessage);
recordLatencyMetric(false);
completed = true;
}
}
@Override
public void close() {
if (!completed) {
// If not explicitly marked as success/error, assume success
success();
}
span.end();
// Record operation metrics
long duration = System.currentTimeMillis() - startTime;
metricsCollector.recordOperationMetric(operationName, duration, completed);
}
private void recordLatencyMetric(boolean success) {
long duration = System.currentTimeMillis() - startTime;
Map<String, String> dimensions = new HashMap<>();
dimensions.put("operation", operationName);
dimensions.put("success", String.valueOf(success));
metricsCollector.recordBusinessMetric("operation.latency", duration, dimensions);
}
private Attributes convertToAttributes(Map<String, String> map) {
if (map == null || map.isEmpty()) {
return Attributes.empty();
}
Attributes.Builder builder = Attributes.builder();
for (Map.Entry<String, String> entry : map.entrySet()) {
builder.put(entry.getKey(), entry.getValue());
}
return builder.build();
}
// Getters
public Span getSpan() { return span; }
public Context getContext() { return context; }
public String getOperationName() { return operationName; }
public String getTraceId() { return span.getSpanContext().getTraceId(); }
public String getSpanId() { return span.getSpanContext().getSpanId(); }
}
3. Metrics Collection
/**
* Metrics collector for service mesh observability
*/
public class MetricsCollector {
private final OpenTelemetry openTelemetry;
private final ObservabilityConfig config;
private final Map<String, io.opentelemetry.api.metrics.LongCounter> counters;
private final Map<String, io.opentelemetry.api.metrics.DoubleHistogram> histograms;
private final Map<String, io.opentelemetry.api.metrics.LongUpDownCounter> upDownCounters;
private final ScheduledExecutorService metricsExecutor;
public MetricsCollector(OpenTelemetry openTelemetry, ObservabilityConfig config) {
this.openTelemetry = openTelemetry;
this.config = config;
this.counters = new ConcurrentHashMap<>();
this.histograms = new ConcurrentHashMap<>();
this.upDownCounters = new ConcurrentHashMap<>();
this.metricsExecutor = Executors.newScheduledThreadPool(1);
initializeDefaultMetrics();
startPeriodicMetricsCollection();
}
private void initializeDefaultMetrics() {
// HTTP request metrics
createCounter("http_requests_total", "Total HTTP requests", "requests");
createHistogram("http_request_duration_seconds", "HTTP request duration in seconds", "seconds");
createCounter("http_errors_total", "Total HTTP errors", "errors");
// Business metrics
createCounter("business_operations_total", "Total business operations", "operations");
createHistogram("business_operation_duration_ms", "Business operation duration", "milliseconds");
// System metrics
createUpDownCounter("active_connections", "Active connections", "connections");
createGauge("memory_usage_bytes", "Memory usage in bytes", "bytes");
createGauge("cpu_usage_percent", "CPU usage percentage", "percent");
}
public void registerMeshMetrics() {
// Service mesh specific metrics
createCounter("mesh_requests_total", "Total service mesh requests", "requests");
createHistogram("mesh_request_duration_seconds", "Service mesh request duration", "seconds");
createCounter("mesh_errors_total", "Total service mesh errors", "errors");
createCounter("mesh_retries_total", "Total service mesh retries", "retries");
createCounter("mesh_circuit_breaker_opens", "Circuit breaker open events", "events");
createGauge("mesh_active_endpoints", "Active endpoints in mesh", "endpoints");
}
public void recordBusinessMetric(String metricName, double value, Map<String, String> dimensions) {
try {
io.opentelemetry.api.metrics.DoubleHistogram histogram = histograms.get(metricName);
if (histogram != null) {
histogram.record(value, convertToAttributes(dimensions));
} else {
// Create histogram on the fly if it doesn't exist
createHistogram(metricName, "Business metric: " + metricName, "units");
histograms.get(metricName).record(value, convertToAttributes(dimensions));
}
} catch (Exception e) {
System.err.println("Error recording business metric: " + e.getMessage());
}
}
public void recordMeshMetric(String metricName, double value, Map<String, String> meshDimensions) {
try {
Map<String, String> dimensions = new HashMap<>(meshDimensions);
dimensions.put("mesh.name", config.getMeshName());
dimensions.put("service.name", config.getServiceName());
dimensions.put("cluster", config.getClusterName());
recordBusinessMetric(metricName, value, dimensions);
} catch (Exception e) {
System.err.println("Error recording mesh metric: " + e.getMessage());
}
}
public void recordOperationMetric(String operationName, long duration, boolean success) {
Map<String, String> dimensions = new HashMap<>();
dimensions.put("operation", operationName);
dimensions.put("success", String.valueOf(success));
dimensions.put("service", config.getServiceName());
recordBusinessMetric("operation.duration_ms", duration, dimensions);
// Increment operation counter
String counterName = success ? "operations_success_total" : "operations_failed_total";
incrementCounter(counterName, 1, dimensions);
}
public void incrementCounter(String counterName, long increment, Map<String, String> dimensions) {
try {
io.opentelemetry.api.metrics.LongCounter counter = counters.get(counterName);
if (counter != null) {
counter.add(increment, convertToAttributes(dimensions));
}
} catch (Exception e) {
System.err.println("Error incrementing counter: " + e.getMessage());
}
}
public void recordHistogram(String histogramName, double value, Map<String, String> dimensions) {
try {
io.opentelemetry.api.metrics.DoubleHistogram histogram = histograms.get(histogramName);
if (histogram != null) {
histogram.record(value, convertToAttributes(dimensions));
}
} catch (Exception e) {
System.err.println("Error recording histogram: " + e.getMessage());
}
}
private void createCounter(String name, String description, String unit) {
io.opentelemetry.api.metrics.LongCounter counter = openTelemetry.getMeter(config.getServiceName())
.counterBuilder(name)
.setDescription(description)
.setUnit(unit)
.build();
counters.put(name, counter);
}
private void createHistogram(String name, String description, String unit) {
io.opentelemetry.api.metrics.DoubleHistogram histogram = openTelemetry.getMeter(config.getServiceName())
.histogramBuilder(name)
.setDescription(description)
.setUnit(unit)
.build();
histograms.put(name, histogram);
}
private void createUpDownCounter(String name, String description, String unit) {
io.opentelemetry.api.metrics.LongUpDownCounter counter = openTelemetry.getMeter(config.getServiceName())
.upDownCounterBuilder(name)
.setDescription(description)
.setUnit(unit)
.build();
upDownCounters.put(name, counter);
}
private void createGauge(String name, String description, String unit) {
// Gauges are implemented as asynchronous instruments
openTelemetry.getMeter(config.getServiceName())
.gaugeBuilder(name)
.setDescription(description)
.setUnit(unit)
.buildWithCallback(measurement -> {
// This will be called periodically to collect gauge values
double value = collectGaugeValue(name);
measurement.record(value);
});
}
private double collectGaugeValue(String gaugeName) {
switch (gaugeName) {
case "memory_usage_bytes":
return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
case "cpu_usage_percent":
return getProcessCpuLoad();
case "mesh_active_endpoints":
return getActiveEndpointsCount();
default:
return 0.0;
}
}
private double getProcessCpuLoad() {
// Simplified CPU usage calculation
// In production, use proper system metrics collection
return Math.random() * 100; // Placeholder
}
private double getActiveEndpointsCount() {
// This would integrate with service mesh control plane
// For now, return a placeholder value
return 5.0; // Placeholder
}
private void startPeriodicMetricsCollection() {
metricsExecutor.scheduleAtFixedRate(() -> {
try {
collectSystemMetrics();
collectMeshMetrics();
} catch (Exception e) {
System.err.println("Error in periodic metrics collection: " + e.getMessage());
}
}, 0, 30, TimeUnit.SECONDS);
}
private void collectSystemMetrics() {
// Collect JVM metrics
Runtime runtime = Runtime.getRuntime();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
long maxMemory = runtime.maxMemory();
Map<String, String> dimensions = new HashMap<>();
dimensions.put("service", config.getServiceName());
recordBusinessMetric("jvm.memory.used", usedMemory, dimensions);
recordBusinessMetric("jvm.memory.max", maxMemory, dimensions);
recordBusinessMetric("jvm.threads.active", (double) Thread.activeCount(), dimensions);
}
private void collectMeshMetrics() {
// Collect service mesh specific metrics
// This would integrate with service mesh APIs
Map<String, String> dimensions = new HashMap<>();
dimensions.put("mesh", config.getMeshName());
dimensions.put("service", config.getServiceName());
dimensions.put("cluster", config.getClusterName());
// Placeholder mesh metrics
recordMeshMetric("mesh.endpoints.healthy", 4.0, dimensions);
recordMeshMetric("mesh.endpoints.total", 5.0, dimensions);
recordMeshMetric("mesh.request.rate", 100.0, dimensions);
}
private Attributes convertToAttributes(Map<String, String> dimensions) {
if (dimensions == null || dimensions.isEmpty()) {
return Attributes.empty();
}
Attributes.Builder builder = Attributes.builder();
for (Map.Entry<String, String> entry : dimensions.entrySet()) {
builder.put(entry.getKey(), entry.getValue());
}
return builder.build();
}
public void shutdown() {
metricsExecutor.shutdown();
try {
if (!metricsExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
metricsExecutor.shutdownNow();
}
} catch (InterruptedException e) {
metricsExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
4. Distributed Logging
/**
* Distributed context-aware logger
*/
public class DistributedLogger {
private final OpenTelemetry openTelemetry;
private final org.slf4j.Logger slf4jLogger;
public DistributedLogger(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
this.slf4jLogger = org.slf4j.LoggerFactory.getLogger("service-mesh-observability");
}
public void log(String level, String message, Map<String, Object> fields) {
log(level, message, fields, null);
}
public void log(String level, String message, Map<String, Object> fields, Throwable throwable) {
// Get current span context
Span currentSpan = Span.current();
String traceId = currentSpan.getSpanContext().getTraceId();
String spanId = currentSpan.getSpanContext().getSpanId();
// Create structured log entry
Map<String, Object> logEntry = new HashMap<>();
logEntry.put("timestamp", Instant.now().toString());
logEntry.put("level", level);
logEntry.put("message", message);
logEntry.put("trace_id", traceId);
logEntry.put("span_id", spanId);
if (fields != null) {
logEntry.putAll(fields);
}
// Add span attributes if available
addSpanAttributes(logEntry, currentSpan);
// Log using appropriate level
String jsonLog = convertToJson(logEntry);
switch (level.toLowerCase()) {
case "error":
if (throwable != null) {
slf4jLogger.error(jsonLog, throwable);
} else {
slf4jLogger.error(jsonLog);
}
break;
case "warn":
slf4jLogger.warn(jsonLog);
break;
case "info":
slf4jLogger.info(jsonLog);
break;
case "debug":
slf4jLogger.debug(jsonLog);
break;
case "trace":
slf4jLogger.trace(jsonLog);
break;
default:
slf4jLogger.info(jsonLog);
}
// Also record log as span event
currentSpan.addEvent("log", Attributes.builder()
.put("log.level", level)
.put("log.message", message)
.build());
}
private void addSpanAttributes(Map<String, Object> logEntry, Span span) {
// This is a simplified implementation
// In a real scenario, you might extract custom span attributes
logEntry.put("service.name", getServiceNameFromContext());
logEntry.put("service.mesh", getMeshNameFromContext());
}
private String getServiceNameFromContext() {
// Extract from context or configuration
return "unknown-service";
}
private String getMeshNameFromContext() {
// Extract from context or configuration
return "unknown-mesh";
}
private String convertToJson(Map<String, Object> map) {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(map);
} catch (Exception e) {
// Fallback to simple string representation
return map.toString();
}
}
// Convenience methods
public void debug(String message, Map<String, Object> fields) {
log("debug", message, fields);
}
public void info(String message, Map<String, Object> fields) {
log("info", message, fields);
}
public void warn(String message, Map<String, Object> fields) {
log("warn", message, fields);
}
public void warn(String message, Map<String, Object> fields, Throwable throwable) {
log("warn", message, fields, throwable);
}
public void error(String message, Map<String, Object> fields) {
log("error", message, fields);
}
public void error(String message, Map<String, Object> fields, Throwable throwable) {
log("error", message, fields, throwable);
}
/**
* MDC (Mapped Diagnostic Context) support for SLF4J
*/
public void withContext(Map<String, String> context, Runnable operation) {
// Set MDC context
Map<String, String> previousContext = org.slf4j.MDC.getCopyOfContextMap();
try {
// Clear and set new context
org.slf4j.MDC.clear();
if (context != null) {
for (Map.Entry<String, String> entry : context.entrySet()) {
org.slf4j.MDC.put(entry.getKey(), entry.getValue());
}
}
// Add tracing context
Span currentSpan = Span.current();
if (currentSpan.getSpanContext().isValid()) {
org.slf4j.MDC.put("trace_id", currentSpan.getSpanContext().getTraceId());
org.slf4j.MDC.put("span_id", currentSpan.getSpanContext().getSpanId());
}
operation.run();
} finally {
// Restore previous context
org.slf4j.MDC.clear();
if (previousContext != null) {
org.slf4j.MDC.setContextMap(previousContext);
}
}
}
}
5. Service Mesh Integration
/**
* Service mesh integration for advanced observability
*/
public class ServiceMeshIntegrator {
private final ObservabilityConfig config;
private final MeshClient meshClient;
private final ScheduledExecutorService meshExecutor;
private final Map<String, Object> meshState;
public ServiceMeshIntegrator(ObservabilityConfig config) {
this.config = config;
this.meshClient = createMeshClient(config);
this.meshExecutor = Executors.newScheduledThreadPool(2);
this.meshState = new ConcurrentHashMap<>();
}
public void initialize() {
startMeshStateCollection();
startHealthMonitoring();
registerServiceWithMesh();
}
public ServiceMeshInsights collectInsights() {
try {
ServiceMeshInsights.InsightsBuilder builder = ServiceMeshInsights.builder();
// Collect traffic insights
TrafficInsights traffic = collectTrafficInsights();
builder.trafficInsights(traffic);
// Collect topology insights
TopologyInsights topology = collectTopologyInsights();
builder.topologyInsights(topology);
// Collect security insights
SecurityInsights security = collectSecurityInsights();
builder.securityInsights(security);
// Collect performance insights
PerformanceInsights performance = collectPerformanceInsights();
builder.performanceInsights(performance);
return builder.build();
} catch (Exception e) {
System.err.println("Error collecting mesh insights: " + e.getMessage());
return ServiceMeshInsights.empty();
}
}
public ServiceMeshHealth checkHealth() {
try {
ServiceMeshHealth.HealthBuilder builder = ServiceMeshHealth.builder();
// Check control plane health
boolean controlPlaneHealthy = checkControlPlaneHealth();
builder.controlPlaneHealthy(controlPlaneHealthy);
// Check data plane health
boolean dataPlaneHealthy = checkDataPlaneHealth();
builder.dataPlaneHealthy(dataPlaneHealthy);
// Check service health
Map<String, Boolean> serviceHealth = checkServiceHealth();
builder.serviceHealth(serviceHealth);
// Check policy enforcement
boolean policiesEnforced = checkPolicyEnforcement();
builder.policiesEnforced(policiesEnforced);
return builder.build();
} catch (Exception e) {
System.err.println("Error checking mesh health: " + e.getMessage());
return ServiceMeshHealth.unhealthy(e.getMessage());
}
}
public Map<String, Object> getMeshConfig() {
try {
return meshClient.getMeshConfiguration();
} catch (Exception e) {
System.err.println("Error getting mesh configuration: " + e.getMessage());
return Collections.emptyMap();
}
}
public List<MeshEndpoint> getEndpoints(String serviceName) {
try {
return meshClient.getEndpoints(serviceName);
} catch (Exception e) {
System.err.println("Error getting endpoints for service: " + serviceName);
return Collections.emptyList();
}
}
private void startMeshStateCollection() {
meshExecutor.scheduleAtFixedRate(() -> {
try {
updateMeshState();
} catch (Exception e) {
System.err.println("Error updating mesh state: " + e.getMessage());
}
}, 0, 60, TimeUnit.SECONDS);
}
private void startHealthMonitoring() {
meshExecutor.scheduleAtFixedRate(() -> {
try {
ServiceMeshHealth health = checkHealth();
meshState.put("health", health);
if (!health.isHealthy()) {
System.warn("Service mesh health check failed: " + health.getHealthStatus());
}
} catch (Exception e) {
System.err.println("Error in health monitoring: " + e.getMessage());
}
}, 0, 30, TimeUnit.SECONDS);
}
private void registerServiceWithMesh() {
try {
Map<String, Object> registration = new HashMap<>();
registration.put("serviceName", config.getServiceName());
registration.put("version", config.getServiceVersion());
registration.put("environment", config.getEnvironment());
registration.put("cluster", config.getClusterName());
meshClient.registerService(registration);
System.info("Service registered with mesh: " + config.getServiceName());
} catch (Exception e) {
System.err.println("Error registering service with mesh: " + e.getMessage());
}
}
private void updateMeshState() {
try {
// Update service endpoints
List<MeshEndpoint> endpoints = getEndpoints(config.getServiceName());
meshState.put("endpoints", endpoints);
// Update traffic metrics
Map<String, Object> trafficMetrics = meshClient.getTrafficMetrics();
meshState.put("traffic", trafficMetrics);
// Update policy state
Map<String, Object> policies = meshClient.getPolicies();
meshState.put("policies", policies);
} catch (Exception e) {
System.err.println("Error updating mesh state: " + e.getMessage());
}
}
private TrafficInsights collectTrafficInsights() {
try {
Map<String, Object> trafficData = (Map<String, Object>) meshState.get("traffic");
if (trafficData == null) {
return TrafficInsights.empty();
}
return TrafficInsights.builder()
.requestRate(getDouble(trafficData, "requestRate", 0.0))
.errorRate(getDouble(trafficData, "errorRate", 0.0))
.p99Latency(getDouble(trafficData, "p99Latency", 0.0))
.throughput(getDouble(trafficData, "throughput", 0.0))
.build();
} catch (Exception e) {
return TrafficInsights.empty();
}
}
private TopologyInsights collectTopologyInsights() {
try {
@SuppressWarnings("unchecked")
List<MeshEndpoint> endpoints = (List<MeshEndpoint>) meshState.get("endpoints");
return TopologyInsights.builder()
.serviceCount(getServiceCount())
.endpointCount(endpoints != null ? endpoints.size() : 0)
.connectedServices(getConnectedServices())
.meshDensity(calculateMeshDensity())
.build();
} catch (Exception e) {
return TopologyInsights.empty();
}
}
private SecurityInsights collectSecurityInsights() {
try {
@SuppressWarnings("unchecked")
Map<String, Object> policies = (Map<String, Object>) meshState.get("policies");
return SecurityInsights.builder()
.mtlsEnabled(getBoolean(policies, "mtlsEnabled", false))
.authPolicies(getInt(policies, "authPolicies", 0))
.networkPolicies(getInt(policies, "networkPolicies", 0))
.securityScore(calculateSecurityScore(policies))
.build();
} catch (Exception e) {
return SecurityInsights.empty();
}
}
private PerformanceInsights collectPerformanceInsights() {
try {
Map<String, Object> trafficData = (Map<String, Object>) meshState.get("traffic");
return PerformanceInsights.builder()
.averageLatency(getDouble(trafficData, "avgLatency", 0.0))
.throughput(getDouble(trafficData, "throughput", 0.0))
.errorRate(getDouble(trafficData, "errorRate", 0.0))
.saturation(getDouble(trafficData, "saturation", 0.0))
.build();
} catch (Exception e) {
return PerformanceInsights.empty();
}
}
private MeshClient createMeshClient(ObservabilityConfig config) {
// Factory method to create appropriate mesh client
switch (config.getMeshName().toLowerCase()) {
case "istio":
return new IstioMeshClient(config);
case "linkerd":
return new LinkerdMeshClient(config);
case "consul":
return new ConsulMeshClient(config);
default:
return new DefaultMeshClient(config);
}
}
// Helper methods
private double getDouble(Map<String, Object> map, String key, double defaultValue) {
Object value = map.get(key);
return value instanceof Number ? ((Number) value).doubleValue() : defaultValue;
}
private int getInt(Map<String, Object> map, String key, int defaultValue) {
Object value = map.get(key);
return value instanceof Number ? ((Number) value).intValue() : defaultValue;
}
private boolean getBoolean(Map<String, Object> map, String key, boolean defaultValue) {
Object value = map.get(key);
return value instanceof Boolean ? (Boolean) value : defaultValue;
}
private int getServiceCount() {
// This would query the mesh control plane
return 10; // Placeholder
}
private List<String> getConnectedServices() {
// This would query the mesh control plane
return Arrays.asList("service-a", "service-b", "service-c"); // Placeholder
}
private double calculateMeshDensity() {
// Calculate mesh density based on service connections
return 0.75; // Placeholder
}
private double calculateSecurityScore(Map<String, Object> policies) {
// Calculate security score based on policies
return 85.0; // Placeholder
}
private boolean checkControlPlaneHealth() {
try {
return meshClient.checkControlPlaneHealth();
} catch (Exception e) {
return false;
}
}
private boolean checkDataPlaneHealth() {
try {
return meshClient.checkDataPlaneHealth();
} catch (Exception e) {
return false;
}
}
private Map<String, Boolean> checkServiceHealth() {
try {
return meshClient.checkServiceHealth();
} catch (Exception e) {
return Collections.singletonMap(config.getServiceName(), false);
}
}
private boolean checkPolicyEnforcement() {
try {
return meshClient.checkPolicyEnforcement();
} catch (Exception e) {
return false;
}
}
public void shutdown() {
meshExecutor.shutdown();
try {
if (!meshExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
meshExecutor.shutdownNow();
}
} catch (InterruptedException e) {
meshExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
/**
* Service mesh client interface
*/
interface MeshClient {
void registerService(Map<String, Object> registration);
List<MeshEndpoint> getEndpoints(String serviceName);
Map<String, Object> getTrafficMetrics();
Map<String, Object> getPolicies();
Map<String, Object> getMeshConfiguration();
boolean checkControlPlaneHealth();
boolean checkDataPlaneHealth();
Map<String, Boolean> checkServiceHealth();
boolean checkPolicyEnforcement();
}
/**
* Istio mesh client implementation
*/
class IstioMeshClient implements MeshClient {
private final ObservabilityConfig config;
public IstioMeshClient(ObservabilityConfig config) {
this.config = config;
}
@Override
public void registerService(Map<String, Object> registration) {
// Implement Istio service registration
System.info("Registering service with Istio: " + registration);
}
@Override
public List<MeshEndpoint> getEndpoints(String serviceName) {
// Query Istio Pilot for endpoints
return Collections.emptyList(); // Simplified
}
@Override
public Map<String, Object> getTrafficMetrics() {
// Query Istio Mixer or Telemetry API for metrics
Map<String, Object> metrics = new HashMap<>();
metrics.put("requestRate", 100.0);
metrics.put("errorRate", 0.5);
metrics.put("p99Latency", 250.0);
metrics.put("avgLatency", 50.0);
metrics.put("throughput", 1000.0);
return metrics;
}
@Override
public Map<String, Object> getPolicies() {
// Query Istio Pilot for policies
Map<String, Object> policies = new HashMap<>();
policies.put("mtlsEnabled", true);
policies.put("authPolicies", 3);
policies.put("networkPolicies", 5);
return policies;
}
@Override
public Map<String, Object> getMeshConfiguration() {
// Get Istio mesh configuration
Map<String, Object> config = new HashMap<>();
config.put("meshId", "istio-mesh");
config.put("version", "1.16.0");
config.put("trustDomain", "cluster.local");
return config;
}
@Override
public boolean checkControlPlaneHealth() {
// Check Istiod health
return true; // Simplified
}
@Override
public boolean checkDataPlaneHealth() {
// Check Envoy proxy health
return true; // Simplified
}
@Override
public Map<String, Boolean> checkServiceHealth() {
// Check service health via Istio
Map<String, Boolean> health = new HashMap<>();
health.put(config.getServiceName(), true);
return health;
}
@Override
public boolean checkPolicyEnforcement() {
// Check if policies are being enforced
return true; // Simplified
}
}
// Similar implementations for LinkerdMeshClient, ConsulMeshClient, DefaultMeshClient
6. Domain Models
/**
* Service mesh insights data model
*/
public class ServiceMeshInsights {
private final TrafficInsights trafficInsights;
private final TopologyInsights topologyInsights;
private final SecurityInsights securityInsights;
private final PerformanceInsights performanceInsights;
private ServiceMeshInsights(Builder builder) {
this.trafficInsights = builder.trafficInsights;
this.topologyInsights = builder.topologyInsights;
this.securityInsights = builder.securityInsights;
this.performanceInsights = builder.performanceInsights;
}
// Getters
public TrafficInsights getTrafficInsights() { return trafficInsights; }
public TopologyInsights getTopologyInsights() { return topologyInsights; }
public SecurityInsights getSecurityInsights() { return securityInsights; }
public PerformanceInsights getPerformanceInsights() { return performanceInsights; }
public static ServiceMeshInsights empty() {
return new ServiceMeshInsights(
TrafficInsights.empty(),
TopologyInsights.empty(),
SecurityInsights.empty(),
PerformanceInsights.empty()
);
}
public static class Builder {
private TrafficInsights trafficInsights = TrafficInsights.empty();
private TopologyInsights topologyInsights = TopologyInsights.empty();
private SecurityInsights securityInsights = SecurityInsights.empty();
private PerformanceInsights performanceInsights = PerformanceInsights.empty();
public Builder trafficInsights(TrafficInsights trafficInsights) {
this.trafficInsights = trafficInsights;
return this;
}
public Builder topologyInsights(TopologyInsights topologyInsights) {
this.topologyInsights = topologyInsights;
return this;
}
public Builder securityInsights(SecurityInsights securityInsights) {
this.securityInsights = securityInsights;
return this;
}
public Builder performanceInsights(PerformanceInsights performanceInsights) {
this.performanceInsights = performanceInsights;
return this;
}
public ServiceMeshInsights build() {
return new ServiceMeshInsights(this);
}
}
}
/**
* Traffic insights
*/
public class TrafficInsights {
private final double requestRate;
private final double errorRate;
private final double p99Latency;
private final double throughput;
public TrafficInsights(double requestRate, double errorRate, double p99Latency, double throughput) {
this.requestRate = requestRate;
this.errorRate = errorRate;
this.p99Latency = p99Latency;
this.throughput = throughput;
}
// Getters
public double getRequestRate() { return requestRate; }
public double getErrorRate() { return errorRate; }
public double getP99Latency() { return p99Latency; }
public double getThroughput() { return throughput; }
public static TrafficInsights empty() {
return new TrafficInsights(0.0, 0.0, 0.0, 0.0);
}
public static class Builder {
private double requestRate;
private double errorRate;
private double p99Latency;
private double throughput;
public Builder requestRate(double requestRate) {
this.requestRate = requestRate;
return this;
}
public Builder errorRate(double errorRate) {
this.errorRate = errorRate;
return this;
}
public Builder p99Latency(double p99Latency) {
this.p99Latency = p99Latency;
return this;
}
public Builder throughput(double throughput) {
this.throughput = throughput;
return this;
}
public TrafficInsights build() {
return new TrafficInsights(requestRate, errorRate, p99Latency, throughput);
}
}
}
/**
* Service mesh health model
*/
public class ServiceMeshHealth {
private final boolean controlPlaneHealthy;
private final boolean dataPlaneHealthy;
private final Map<String, Boolean> serviceHealth;
private final boolean policiesEnforced;
private final String healthStatus;
private ServiceMeshHealth(Builder builder) {
this.controlPlaneHealthy = builder.controlPlaneHealthy;
this.dataPlaneHealthy = builder.dataPlaneHealthy;
this.serviceHealth = builder.serviceHealth;
this.policiesEnforced = builder.policiesEnforced;
this.healthStatus = calculateHealthStatus();
}
public boolean isHealthy() {
return controlPlaneHealthy && dataPlaneHealthy &&
serviceHealth.values().stream().allMatch(Boolean::booleanValue) &&
policiesEnforced;
}
public String getHealthStatus() {
return healthStatus;
}
private String calculateHealthStatus() {
if (isHealthy()) {
return "HEALTHY";
} else if (!controlPlaneHealthy) {
return "CONTROL_PLANE_UNHEALTHY";
} else if (!dataPlaneHealthy) {
return "DATA_PLANE_UNHEALTHY";
} else if (serviceHealth.containsValue(false)) {
return "SERVICES_UNHEALTHY";
} else if (!policiesEnforced) {
return "POLICIES_NOT_ENFORCED";
} else {
return "UNKNOWN";
}
}
public static ServiceMeshHealth unhealthy(String reason) {
return new ServiceMeshHealth(false, false,
Collections.singletonMap("unknown", false), false);
}
public static class Builder {
private boolean controlPlaneHealthy;
private boolean dataPlaneHealthy;
private Map<String, Boolean> serviceHealth = new HashMap<>();
private boolean policiesEnforced;
public Builder controlPlaneHealthy(boolean controlPlaneHealthy) {
this.controlPlaneHealthy = controlPlaneHealthy;
return this;
}
public Builder dataPlaneHealthy(boolean dataPlaneHealthy) {
this.dataPlaneHealthy = dataPlaneHealthy;
return this;
}
public Builder serviceHealth(Map<String, Boolean> serviceHealth) {
this.serviceHealth = serviceHealth;
return this;
}
public Builder policiesEnforced(boolean policiesEnforced) {
this.policiesEnforced = policiesEnforced;
return this;
}
public ServiceMeshHealth build() {
return new ServiceMeshHealth(this);
}
}
}
// Additional domain models for TopologyInsights, SecurityInsights, PerformanceInsights, MeshEndpoint
7. Spring Boot Integration
/**
* Spring Boot Auto-Configuration
*/
@Configuration
@EnableConfigurationProperties(ServiceMeshObservabilityProperties.class)
public class ServiceMeshObservabilityAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public ServiceMeshObservability serviceMeshObservability(ServiceMeshObservabilityProperties properties) {
ObservabilityConfig config = ObservabilityConfig.builder()
.serviceName(properties.getServiceName())
.serviceVersion(properties.getServiceVersion())
.environment(properties.getEnvironment())
.meshName(properties.getMeshName())
.clusterName(properties.getClusterName())
.otlpEndpoint(properties.getOtlpEndpoint())
.metricsPort(properties.getMetricsPort())
.enableTracing(properties.isEnableTracing())
.enableMetrics(properties.isEnableMetrics())
.enableLogging(properties.isEnableLogging())
.build();
return ServiceMeshObservability.initialize(config);
}
@Bean
public ObservabilityAspect observabilityAspect() {
return new ObservabilityAspect();
}
@Bean
public ObservabilityFilter observabilityFilter() {
return new ObservabilityFilter();
}
}
/**
* Configuration properties
*/
@ConfigurationProperties(prefix = "servicemesh.observability")
public class ServiceMeshObservabilityProperties {
private String serviceName = "unknown-service";
private String serviceVersion = "1.0.0";
private String environment = "development";
private String meshName = "istio";
private String clusterName = "default";
private String otlpEndpoint = "http://localhost:4317";
private int metricsPort = 9464;
private boolean enableTracing = true;
private boolean enableMetrics = true;
private boolean enableLogging = true;
// Getters and setters
public String getServiceName() { return serviceName; }
public void setServiceName(String serviceName) { this.serviceName = serviceName; }
public String getServiceVersion() { return serviceVersion; }
public void setServiceVersion(String serviceVersion) { this.serviceVersion = serviceVersion; }
public String getEnvironment() { return environment; }
public void setEnvironment(String environment) { this.environment = environment; }
public String getMeshName() { return meshName; }
public void setMeshName(String meshName) { this.meshName = meshName; }
public String getClusterName() { return clusterName; }
public void setClusterName(String clusterName) { this.clusterName = clusterName; }
public String getOtlpEndpoint() { return otlpEndpoint; }
public void setOtlpEndpoint(String otlpEndpoint) { this.otlpEndpoint = otlpEndpoint; }
public int getMetricsPort() { return metricsPort; }
public void setMetricsPort(int metricsPort) { this.metricsPort = metricsPort; }
public boolean isEnableTracing() { return enableTracing; }
public void setEnableTracing(boolean enableTracing) { this.enableTracing = enableTracing; }
public boolean isEnableMetrics() { return enableMetrics; }
public void setEnableMetrics(boolean enableMetrics) { this.enableMetrics = enableMetrics; }
public boolean isEnableLogging() { return enableLogging; }
public void setEnableLogging(boolean enableLogging) { this.enableLogging = enableLogging; }
}
/**
* Spring AOP aspect for automatic observability
*/
@Aspect
@Component
public class ObservabilityAspect {
private final ServiceMeshObservability observability;
public ObservabilityAspect() {
this.observability = ServiceMeshObservability.getInstance();
}
@Around("@annotation(observable)")
public Object observeMethod(ProceedingJoinPoint joinPoint, Observable observable) throws Throwable {
String operationName = !observable.value().isEmpty() ?
observable.value() : joinPoint.getSignature().toShortString();
Map<String, String> attributes = new HashMap<>();
attributes.put("class", joinPoint.getSignature().getDeclaringTypeName());
attributes.put("method", joinPoint.getSignature().getName());
try (ObservabilityScope scope = observability.startOperation(operationName, attributes)) {
Object result = joinPoint.proceed();
scope.success();
return result;
} catch (Throwable throwable) {
observability.startOperation(operationName, attributes).error(throwable);
throw throwable;
}
}
}
/**
* HTTP filter for observability
*/
@Component
public class ObservabilityFilter implements Filter {
private final ServiceMeshObservability observability;
public ObservabilityFilter() {
this.observability = ServiceMeshObservability.getInstance();
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = (HttpServletResponse) response;
// Extract headers for context propagation
Map<String, String> headers = extractHeaders(httpRequest);
Context context = observability.extractContext(headers);
String operationName = httpRequest.getMethod() + " " + httpRequest.getRequestURI();
Map<String, String> attributes = new HashMap<>();
attributes.put("http.method", httpRequest.getMethod());
attributes.put("http.route", httpRequest.getRequestURI());
attributes.put("http.user_agent", httpRequest.getHeader("User-Agent"));
try (ObservabilityScope scope = observability.startOperation(operationName, attributes)) {
// Add tracing headers to response
Map<String, String> propagationHeaders = scope.getPropagationHeaders();
for (Map.Entry<String, String> header : propagationHeaders.entrySet()) {
httpResponse.setHeader(header.getKey(), header.getValue());
}
chain.doFilter(request, response);
scope.setAttribute("http.status_code", String.valueOf(httpResponse.getStatus()));
if (httpResponse.getStatus() < 400) {
scope.success();
} else {
scope.error("HTTP_" + httpResponse.getStatus(), "HTTP error");
}
} catch (Exception e) {
observability.startOperation(operationName, attributes).error(e);
throw e;
}
}
private Map<String, String> extractHeaders(HttpServletRequest request) {
Map<String, String> headers = new HashMap<>();
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String headerName = headerNames.nextElement();
headers.put(headerName, request.getHeader(headerName));
}
return headers;
}
}
/**
* Annotation for observable methods
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Observable {
String value() default "";
}
8. Usage Examples
/**
* Demo service showing observability usage
*/
@Service
public class OrderService {
private final ServiceMeshObservability observability;
private final DistributedLogger logger;
public OrderService() {
this.observability = ServiceMeshObservability.getInstance();
this.logger = observability.getDistributedLogger();
}
@Observable("order.process")
public Order processOrder(OrderRequest request) {
Map<String, String> attributes = new HashMap<>();
attributes.put("order.id", request.getOrderId());
attributes.put("customer.id", request.getCustomerId());
attributes.put("amount", String.valueOf(request.getAmount()));
try (ObservabilityScope scope = observability.startOperation("order.process", attributes)) {
logger.info("Processing order", Map.of(
"orderId", request.getOrderId(),
"customerId", request.getCustomerId(),
"amount", request.getAmount()
));
// Validate order
scope.addEvent("order.validation.start", Collections.emptyMap());
validateOrder(request);
scope.addEvent("order.validation.complete", Collections.emptyMap());
// Process payment
scope.addEvent("payment.processing.start", Collections.emptyMap());
PaymentResult payment = processPayment(request);
scope.addEvent("payment.processing.complete",
Map.of("payment.status", payment.getStatus()));
// Update inventory
updateInventory(request);
// Record business metric
observability.recordBusinessMetric("order.value", request.getAmount(),
Map.of("customer.tier", getCustomerTier(request.getCustomerId())));
Order order = createOrder(request, payment);
scope.success();
return order;
} catch (ValidationException e) {
observability.startOperation("order.process", attributes).error(e);
logger.error("Order validation failed",
Map.of("orderId", request.getOrderId(), "error", e.getMessage()), e);
throw e;
} catch (PaymentException e) {
observability.startOperation("order.process", attributes).error(e);
logger.error("Payment processing failed",
Map.of("orderId", request.getOrderId(), "error", e.getMessage()), e);
throw e;
}
}
public Order processOrderWithMeshContext(OrderRequest request, Map<String, String> headers) {
// Extract context from incoming headers
Context context = observability.extractContext(headers);
try (io.opentelemetry.context.Scope ignored = context.makeCurrent()) {
return processOrder(request);
}
}
public void checkServiceMeshHealth() {
ServiceMeshHealth health = observability.getMeshHealth();
ServiceMeshInsights insights = observability.getMeshInsights();
logger.info("Service mesh health check", Map.of(
"healthy", health.isHealthy(),
"status", health.getHealthStatus(),
"traffic_rate", insights.getTrafficInsights().getRequestRate(),
"error_rate", insights.getTrafficInsights().getErrorRate()
));
if (!health.isHealthy()) {
// Trigger alert or fallback behavior
handleMeshUnhealthy(health);
}
}
private void validateOrder(OrderRequest request) {
// Validation logic
if (request.getAmount() <= 0) {
throw new ValidationException("Invalid order amount");
}
}
private PaymentResult processPayment(OrderRequest request) {
// Make external payment service call with observability
Map<String, String> callAttributes = new HashMap<>();
callAttributes.put("payment.gateway", "stripe");
callAttributes.put("amount", String.valueOf(request.getAmount()));
try (ObservabilityScope scope = observability.startClientCall(
"payment-service", "process", callAttributes)) {
// Add headers for propagation
Map<String, String> headers = scope.getPropagationHeaders();
// Make HTTP call with headers
PaymentResult result = paymentClient.processPayment(request, headers);
scope.setAttribute("payment.status", result.getStatus());
scope.setAttribute("payment.transaction_id", result.getTransactionId());
if ("SUCCESS".equals(result.getStatus())) {
scope.success();
} else {
scope.error("PAYMENT_FAILED", result.getErrorMessage());
}
return result;
}
}
private void updateInventory(OrderRequest request) {
// Inventory update logic
observability.recordBusinessMetric("inventory.update", 1.0,
Map.of("product.count", String.valueOf(request.getItems().size())));
}
private Order createOrder(OrderRequest request, PaymentResult payment) {
return new Order(UUID.randomUUID().toString(), request, payment);
}
private String getCustomerTier(String customerId) {
// Determine customer tier
return "standard";
}
private void handleMeshUnhealthy(ServiceMeshHealth health) {
// Handle unhealthy mesh scenario
logger.warn("Service mesh is unhealthy, implementing fallback",
Map.of("health_status", health.getHealthStatus()));
}
}
/**
* Demo application
*/
@SpringBootApplication
@EnableAspectJAutoProxy
public class ServiceMeshObservabilityDemo {
public static void main(String[] args) {
// Initialize observability
ObservabilityConfig config = ObservabilityConfig.builder()
.serviceName("order-service")
.serviceVersion("1.0.0")
.environment("production")
.meshName("istio")
.clusterName("production-cluster")
.otlpEndpoint("http://jaeger:4317")
.metricsPort(9464)
.build();
ServiceMeshObservability.initialize(config);
// Start Spring Boot application
SpringApplication.run(ServiceMeshObservabilityDemo.class, args);
}
@Bean
public CommandLineRunner demo(OrderService orderService) {
return args -> {
// Demonstrate observability
OrderRequest request = new OrderRequest("cust-123", "order-456", 99.99);
orderService.processOrder(request);
// Check mesh health
orderService.checkServiceMeshHealth();
};
}
}
Key Features
- Distributed Tracing: End-to-end tracing with OpenTelemetry
- Metrics Collection: Business and mesh-specific metrics
- Structured Logging: Context-aware distributed logging
- Service Mesh Integration: Istio, Linkerd, Consul support
- Health Monitoring: Mesh and service health checks
- Spring Boot Integration: Auto-configuration and AOP
- Context Propagation: W3C trace context and baggage
- Performance Insights: Traffic, topology, security insights
Configuration Example
# application.yml servicemesh: observability: service-name: "order-service" service-version: "1.0.0" environment: "production" mesh-name: "istio" cluster-name: "production-cluster" otlp-endpoint: "http://jaeger:4317" metrics-port: 9464 enable-tracing: true enable-metrics: true enable-logging: true
This comprehensive observability implementation provides production-ready monitoring for Java microservices in service mesh environments with minimal configuration and maximum insights.