Thanos is a highly available metrics system that provides a global view of Prometheus metrics across multiple clusters. This guide demonstrates how to integrate Thanos with Java applications for querying, storing, and analyzing global metrics data.
Why Use Thanos for Global View?
- Unified Querying: Query metrics across multiple Prometheus instances
- Long-Term Storage: Extend retention beyond Prometheus limits
- High Availability: Global deduplication and fault tolerance
- Horizontal Scaling: Handle massive metrics volumes
- Cross-Cluster Visibility: Single pane of glass for all metrics
Prerequisites
- Thanos deployment (Query, Store, Sidecar, Compactor components)
- Java 11+ with HTTP client capabilities
- Maven/Gradle for dependency management
Step 1: Project Dependencies
Maven (pom.xml):
<dependencies> <!-- Thanos Client --> <dependency> <groupId>io.prometheus</groupId> <artifactId>simpleclient</artifactId> <version>0.16.0</version> </dependency> <dependency> <groupId>io.prometheus</groupId> <artifactId>simpleclient_httpserver</artifactId> <version>0.16.0</version> </dependency> <!-- HTTP Client --> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.14</version> </dependency> <!-- JSON Processing --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.15.2</version> </dependency> <!-- Protobuf for Thanos StoreAPI --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.22.2</version> </dependency> <!-- gRPC for StoreAPI --> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty</artifactId> <version>1.53.0</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>1.53.0</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>1.53.0</version> </dependency> <!-- Micrometer for metrics --> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-core</artifactId> <version>1.10.5</version> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> <version>1.10.5</version> </dependency> <!-- Cache --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-cache</artifactId> <version>2.7.0</version> </dependency> <!-- Spring Boot (Optional) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.7.0</version> </dependency> </dependencies>
Step 2: Configuration Classes
@Configuration
@ConfigurationProperties(prefix = "thanos")
@Data
public class ThanosConfig {
// Thanos Query endpoints
private List<String> queryEndpoints = Arrays.asList("http://localhost:10904");
private String queryApiPath = "/api/v1";
// Thanos Store endpoints for direct gRPC access
private List<String> storeEndpoints = Arrays.asList("localhost:10901");
// Query settings
private long queryTimeout = 30000;
private int maxSamples = 50000000;
private boolean enableDeduplication = true;
private boolean enablePartialResponse = true;
private String replicaLabel = "replica";
// StoreAPI settings
private long storeTimeout = 30000;
private int storeMaxConcurrent = 20;
// Cache settings
private boolean enableQueryCache = true;
private long queryCacheDuration = 300; // seconds
private int queryCacheSize = 1000;
// Authentication
private String bearerToken;
private boolean tlsEnabled = false;
private String tlsCertPath;
private String tlsKeyPath;
// Monitoring
private boolean enableMetrics = true;
private int metricsPort = 9090;
public String getQueryUrl(String endpoint) {
return endpoint + queryApiPath;
}
}
@Component
@Slf4j
public class ThanosClientFactory {
private final ThanosConfig config;
private final ObjectMapper objectMapper;
private final List<CloseableHttpClient> httpClients = new ArrayList<>();
public ThanosClientFactory(ThanosConfig config) {
this.config = config;
this.objectMapper = new ObjectMapper();
initializeObjectMapper();
}
public ThanosQueryClient createQueryClient() {
List<String> endpoints = config.getQueryEndpoints();
if (endpoints.isEmpty()) {
throw new IllegalStateException("No Thanos Query endpoints configured");
}
// Create HTTP clients for each endpoint
List<CloseableHttpClient> clients = endpoints.stream()
.map(this::createHttpClient)
.collect(Collectors.toList());
httpClients.addAll(clients);
return new ThanosQueryClient(config, clients, objectMapper);
}
public ThanosStoreClient createStoreClient() {
return new ThanosStoreClient(config);
}
private CloseableHttpClient createHttpClient(String endpoint) {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout((int) config.getQueryTimeout())
.setSocketTimeout((int) config.getQueryTimeout())
.build();
HttpClientBuilder builder = HttpClients.custom()
.setDefaultRequestConfig(requestConfig)
.setMaxConnTotal(100)
.setMaxConnPerRoute(20);
// Add authentication if configured
if (config.getBearerToken() != null && !config.getBearerToken().isEmpty()) {
builder.setDefaultHeaders(Collections.singletonList(
new BasicHeader("Authorization", "Bearer " + config.getBearerToken())
));
}
return builder.build();
}
private void initializeObjectMapper() {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
@PreDestroy
public void cleanup() {
httpClients.forEach(client -> {
try {
client.close();
} catch (IOException e) {
log.warn("Error closing HTTP client", e);
}
});
}
}
Step 3: Core Data Models
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ThanosQueryRequest {
private String query;
private Instant startTime;
private Instant endTime;
private Duration step;
private Long timeout;
private boolean deduplication = true;
private boolean partialResponse = true;
private Map<String, String> matchers;
private QueryType queryType = QueryType.RANGE;
public enum QueryType {
RANGE, INSTANT, SERIES, LABELS, LABEL_VALUES
}
public Map<String, String> toQueryParams() {
Map<String, String> params = new HashMap<>();
if (queryType == QueryType.RANGE) {
params.put("query", query);
params.put("start", startTime.toString());
params.put("end", endTime.toString());
params.put("step", step.getSeconds() + "s");
} else if (queryType == QueryType.INSTANT) {
params.put("query", query);
if (startTime != null) {
params.put("time", startTime.toString());
}
} else if (queryType == QueryType.SERIES) {
if (matchers != null) {
params.put("match[]", matchers.values().stream().collect(Collectors.joining(",")));
}
params.put("start", startTime.toString());
params.put("end", endTime.toString());
}
if (timeout != null) {
params.put("timeout", timeout + "s");
}
params.put("dedup", String.valueOf(deduplication));
params.put("partial_response", String.valueOf(partialResponse));
return params;
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ThanosQueryResponse {
private String status;
private QueryData data;
private String errorType;
private String error;
private List<String> warnings;
private Map<String, Object> stats;
public boolean isSuccess() {
return "success".equals(status);
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class QueryData {
private String resultType;
private List<QueryResult> result;
private List<VectorResult> resultVector;
public List<MetricSeries> getAsSeries() {
if ("matrix".equals(resultType) && result != null) {
return result.stream()
.map(QueryResult::toMetricSeries)
.collect(Collectors.toList());
}
return Collections.emptyList();
}
public List<InstantVector> getAsInstantVector() {
if ("vector".equals(resultType) && resultVector != null) {
return resultVector.stream()
.map(VectorResult::toInstantVector)
.collect(Collectors.toList());
}
return Collections.emptyList();
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class QueryResult {
private Map<String, String> metric;
private List<List<Object>> values;
public MetricSeries toMetricSeries() {
List<DataPoint> dataPoints = values.stream()
.map(value -> new DataPoint(
Instant.ofEpochSecond(((Number) value.get(0)).longValue()),
Double.parseDouble(value.get(1).toString())
))
.collect(Collectors.toList());
return new MetricSeries(metric, dataPoints);
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class VectorResult {
private Map<String, String> metric;
private List<Object> value;
public InstantVector toInstantVector() {
return new InstantVector(
metric,
Instant.ofEpochSecond(((Number) value.get(0)).longValue()),
Double.parseDouble(value.get(1).toString())
);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MetricSeries {
private Map<String, String> labels;
private List<DataPoint> dataPoints;
private String sourceCluster;
public MetricSeries(Map<String, String> labels, List<DataPoint> dataPoints) {
this.labels = labels;
this.dataPoints = dataPoints;
this.sourceCluster = labels.get("cluster");
}
public Double getLatestValue() {
if (dataPoints == null || dataPoints.isEmpty()) {
return null;
}
return dataPoints.get(dataPoints.size() - 1).getValue();
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DataPoint {
private Instant timestamp;
private Double value;
public DataPoint(Instant timestamp, Double value) {
this.timestamp = timestamp;
this.value = value;
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class InstantVector {
private Map<String, String> labels;
private Instant timestamp;
private Double value;
private String sourceCluster;
public InstantVector(Map<String, String> labels, Instant timestamp, Double value) {
this.labels = labels;
this.timestamp = timestamp;
this.value = value;
this.sourceCluster = labels.get("cluster");
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GlobalViewStats {
private int totalClusters;
private int activeClusters;
private Map<String, ClusterStats> clusterStats;
private long totalSamples;
private Instant oldestDataPoint;
private Instant newestDataPoint;
private Map<String, Long> metricsByType;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ClusterStats {
private String clusterName;
private boolean reachable;
private long totalMetrics;
private long totalSamples;
private Instant minTimestamp;
private Instant maxTimestamp;
private Duration dataLatency;
private Map<String, Object> additionalInfo;
}
Step 4: Core Service Classes
Thanos Query Client
@Service
@Slf4j
public class ThanosQueryClient {
private final ThanosConfig config;
private final List<CloseableHttpClient> httpClients;
private final ObjectMapper objectMapper;
private final Random random = new Random();
public ThanosQueryClient(ThanosConfig config, List<CloseableHttpClient> httpClients, ObjectMapper objectMapper) {
this.config = config;
this.httpClients = httpClients;
this.objectMapper = objectMapper;
}
@Cacheable(value = "thanosQueries", key = "#request.hashCode()")
public ThanosQueryResponse executeQuery(ThanosQueryRequest request) {
String endpoint = getRandomEndpoint();
String url = buildQueryUrl(endpoint, request);
try {
HttpGet httpGet = new HttpGet(url);
httpGet.setHeader("Accept", "application/json");
try (CloseableHttpResponse response = getHttpClient(endpoint).execute(httpGet)) {
String responseBody = EntityUtils.toString(response.getEntity());
if (response.getStatusLine().getStatusCode() != 200) {
throw new ThanosException("Query failed with status: " +
response.getStatusLine().getStatusCode() + " - " + responseBody);
}
ThanosQueryResponse queryResponse = objectMapper.readValue(
responseBody, ThanosQueryResponse.class);
if (!queryResponse.isSuccess()) {
log.warn("Query execution failed: {} - {}",
queryResponse.getErrorType(), queryResponse.getError());
}
return queryResponse;
}
} catch (Exception e) {
log.error("Failed to execute Thanos query: {}", url, e);
throw new ThanosException("Query execution failed", e);
}
}
public List<MetricSeries> queryRange(String query, Instant start, Instant end, Duration step) {
ThanosQueryRequest request = ThanosQueryRequest.builder()
.query(query)
.startTime(start)
.endTime(end)
.step(step)
.queryType(ThanosQueryRequest.QueryType.RANGE)
.deduplication(config.isEnableDeduplication())
.partialResponse(config.isEnablePartialResponse())
.build();
ThanosQueryResponse response = executeQuery(request);
return response.getData().getAsSeries();
}
public List<InstantVector> queryInstant(String query, Instant time) {
ThanosQueryRequest request = ThanosQueryRequest.builder()
.query(query)
.startTime(time)
.queryType(ThanosQueryRequest.QueryType.INSTANT)
.deduplication(config.isEnableDeduplication())
.partialResponse(config.isEnablePartialResponse())
.build();
ThanosQueryResponse response = executeQuery(request);
return response.getData().getAsInstantVector();
}
public List<String> getLabelValues(String labelName, Instant start, Instant end) {
String endpoint = getRandomEndpoint();
String url = config.getQueryUrl(endpoint) + "/label/" + labelName + "/values";
Map<String, String> params = new HashMap<>();
params.put("start", start.toString());
params.put("end", end.toString());
url = addQueryParams(url, params);
try {
HttpGet httpGet = new HttpGet(url);
httpGet.setHeader("Accept", "application/json");
try (CloseableHttpResponse response = getHttpClient(endpoint).execute(httpGet)) {
String responseBody = EntityUtils.toString(response.getEntity());
ThanosQueryResponse queryResponse = objectMapper.readValue(
responseBody, ThanosQueryResponse.class);
if (queryResponse.isSuccess() && queryResponse.getData() != null) {
// Parse label values from response
return parseLabelValues(queryResponse);
}
return Collections.emptyList();
}
} catch (Exception e) {
log.error("Failed to get label values for: {}", labelName, e);
throw new ThanosException("Label values query failed", e);
}
}
public Set<String> getActiveClusters(Instant start, Instant end) {
// Query for unique cluster labels across all metrics
List<String> clusterLabels = getLabelValues("cluster", start, end);
return new HashSet<>(clusterLabels);
}
public GlobalViewStats getGlobalViewStats(Instant start, Instant end) {
Set<String> clusters = getActiveClusters(start, end);
Map<String, ClusterStats> clusterStats = new HashMap<>();
for (String cluster : clusters) {
clusterStats.put(cluster, getClusterStats(cluster, start, end));
}
return GlobalViewStats.builder()
.totalClusters(clusters.size())
.activeClusters((int) clusterStats.values().stream()
.filter(ClusterStats::isReachable)
.count())
.clusterStats(clusterStats)
.oldestDataPoint(getOldestDataPoint(clusterStats))
.newestDataPoint(getNewestDataPoint(clusterStats))
.metricsByType(getMetricsDistribution(start, end))
.build();
}
private ClusterStats getClusterStats(String cluster, Instant start, Instant end) {
try {
// Query cluster-specific metrics to determine health and stats
String query = String.format("up{cluster=\"%s\"}", cluster);
List<MetricSeries> results = queryRange(query, start, end, Duration.ofMinutes(1));
ClusterStats stats = ClusterStats.builder()
.clusterName(cluster)
.reachable(!results.isEmpty())
.build();
if (!results.isEmpty()) {
// Calculate additional statistics
MetricSeries series = results.get(0);
stats.setMinTimestamp(series.getDataPoints().get(0).getTimestamp());
stats.setMaxTimestamp(series.getDataPoints().get(series.getDataPoints().size() - 1).getTimestamp());
stats.setDataLatency(Duration.between(stats.getMaxTimestamp(), Instant.now()));
}
return stats;
} catch (Exception e) {
log.warn("Failed to get stats for cluster: {}", cluster, e);
return ClusterStats.builder()
.clusterName(cluster)
.reachable(false)
.build();
}
}
private String buildQueryUrl(String endpoint, ThanosQueryRequest request) {
String baseUrl = config.getQueryUrl(endpoint);
String path;
switch (request.getQueryType()) {
case RANGE:
path = "/query_range";
break;
case INSTANT:
path = "/query";
break;
case SERIES:
path = "/series";
break;
default:
path = "/query";
}
String url = baseUrl + path;
return addQueryParams(url, request.toQueryParams());
}
private String addQueryParams(String url, Map<String, String> params) {
if (params == null || params.isEmpty()) {
return url;
}
StringBuilder urlBuilder = new StringBuilder(url);
urlBuilder.append("?");
params.forEach((key, value) -> {
if (value != null) {
urlBuilder.append(key)
.append("=")
.append(URLEncoder.encode(value, StandardCharsets.UTF_8))
.append("&");
}
});
return urlBuilder.toString();
}
private String getRandomEndpoint() {
List<String> endpoints = config.getQueryEndpoints();
return endpoints.get(random.nextInt(endpoints.size()));
}
private CloseableHttpClient getHttpClient(String endpoint) {
int index = config.getQueryEndpoints().indexOf(endpoint);
return httpClients.get(index % httpClients.size());
}
private List<String> parseLabelValues(ThanosQueryResponse response) {
// Implementation depends on the exact response format
return Collections.emptyList();
}
private Instant getOldestDataPoint(Map<String, ClusterStats> clusterStats) {
return clusterStats.values().stream()
.filter(ClusterStats::isReachable)
.map(ClusterStats::getMinTimestamp)
.filter(Objects::nonNull)
.min(Instant::compareTo)
.orElse(Instant.now());
}
private Instant getNewestDataPoint(Map<String, ClusterStats> clusterStats) {
return clusterStats.values().stream()
.filter(ClusterStats::isReachable)
.map(ClusterStats::getMaxTimestamp)
.filter(Objects::nonNull)
.max(Instant::compareTo)
.orElse(Instant.now());
}
private Map<String, Long> getMetricsDistribution(Instant start, Instant end) {
// Query for metrics count by type
Map<String, Long> distribution = new HashMap<>();
// This would require more complex queries to get actual distribution
distribution.put("counter", 0L);
distribution.put("gauge", 0L);
distribution.put("histogram", 0L);
distribution.put("summary", 0L);
return distribution;
}
}
Thanos Store Client (gRPC)
@Service
@Slf4j
public class ThanosStoreClient {
private final ThanosConfig config;
private final List<ManagedChannel> channels = new ArrayList<>();
private final List<StoreGrpc.StoreBlockingStub> stubs = new ArrayList<>();
public ThanosStoreClient(ThanosConfig config) {
this.config = config;
initializeGrpcChannels();
}
public List<StoreResponse> querySeries(StoreSeriesRequest request) {
List<StoreResponse> allResults = new ArrayList<>();
for (StoreGrpc.StoreBlockingStub stub : stubs) {
try {
Iterator<StoreResponse> response = stub.withDeadlineAfter(
config.getStoreTimeout(), TimeUnit.MILLISECONDS)
.series(request);
while (response.hasNext()) {
allResults.add(response.next());
}
} catch (Exception e) {
log.warn("Failed to query store endpoint", e);
}
}
return allResults;
}
public List<LabelSet> getLabelSets(Instant start, Instant end) {
List<LabelSet> allLabels = new ArrayList<>();
for (StoreGrpc.StoreBlockingStub stub : stubs) {
try {
StoreLabelRequest request = StoreLabelRequest.newBuilder()
.setStart(start.toEpochMilli())
.setEnd(end.toEpochMilli())
.build();
StoreLabelResponse response = stub.withDeadlineAfter(
config.getStoreTimeout(), TimeUnit.MILLISECONDS)
.labels(request);
allLabels.addAll(response.getLabelsList());
} catch (Exception e) {
log.warn("Failed to get labels from store", e);
}
}
return allLabels;
}
private void initializeGrpcChannels() {
for (String endpoint : config.getStoreEndpoints()) {
try {
ManagedChannel channel = ManagedChannelBuilder.forTarget(endpoint)
.usePlaintext() // Use TLS in production
.build();
channels.add(channel);
stubs.add(StoreGrpc.newBlockingStub(channel));
log.info("Initialized gRPC channel to Thanos Store: {}", endpoint);
} catch (Exception e) {
log.error("Failed to initialize gRPC channel to: {}", endpoint, e);
}
}
}
@PreDestroy
public void cleanup() {
channels.forEach(ManagedChannel::shutdown);
}
}
Global Metrics Service
@Service
@Slf4j
public class GlobalMetricsService {
private final ThanosQueryClient queryClient;
private final ThanosStoreClient storeClient;
public GlobalMetricsService(ThanosQueryClient queryClient, ThanosStoreClient storeClient) {
this.queryClient = queryClient;
this.storeClient = storeClient;
}
public Map<String, List<MetricSeries>> getCrossClusterMetrics(String metricName,
Instant start, Instant end,
Duration step) {
String query = metricName;
List<MetricSeries> allResults = queryClient.queryRange(query, start, end, step);
// Group by cluster
return allResults.stream()
.collect(Collectors.groupingBy(MetricSeries::getSourceCluster));
}
public CrossClusterComparison compareClusters(String metricName, List<String> clusters,
Instant start, Instant end) {
Map<String, List<MetricSeries>> clusterData = new HashMap<>();
Map<String, Double> clusterAverages = new HashMap<>();
for (String cluster : clusters) {
String query = String.format("%s{cluster=\"%s\"}", metricName, cluster);
List<MetricSeries> results = queryClient.queryRange(query, start, end, Duration.ofMinutes(5));
if (!results.isEmpty()) {
clusterData.put(cluster, results);
// Calculate average
double average = results.stream()
.flatMap(series -> series.getDataPoints().stream())
.mapToDouble(DataPoint::getValue)
.average()
.orElse(0.0);
clusterAverages.put(cluster, average);
}
}
return new CrossClusterComparison(metricName, clusterData, clusterAverages);
}
public AlertCorrelation findCorrelatedAlerts(Instant start, Instant end) {
// Query for alerts across all clusters
String alertQuery = "ALERTS";
List<MetricSeries> allAlerts = queryClient.queryRange(alertQuery, start, end, Duration.ofMinutes(1));
// Group alerts by time and find correlations
Map<Instant, List<String>> alertsByTime = new HashMap<>();
for (MetricSeries alert : allAlerts) {
for (DataPoint point : alert.getDataPoints()) {
if (point.getValue() == 1.0) { // Alert firing
alertsByTime
.computeIfAbsent(point.getTimestamp(), k -> new ArrayList<>())
.add(alert.getLabels().get("alertname"));
}
}
}
return new AlertCorrelation(alertsByTime);
}
public GlobalViewStats getGlobalView(Instant start, Instant end) {
return queryClient.getGlobalViewStats(start, end);
}
public List<String> findAnomalousClusters(String metricName, double threshold,
Instant start, Instant end) {
Map<String, List<MetricSeries>> clusterMetrics = getCrossClusterMetrics(
metricName, start, end, Duration.ofMinutes(5));
// Calculate global statistics
DescriptiveStatistics globalStats = new DescriptiveStatistics();
clusterMetrics.values().stream()
.flatMap(List::stream)
.flatMap(series -> series.getDataPoints().stream())
.mapToDouble(DataPoint::getValue)
.forEach(globalStats::addValue);
double globalMean = globalStats.getMean();
double globalStdDev = globalStats.getStandardDeviation();
// Find clusters with values outside threshold
List<String> anomalousClusters = new ArrayList<>();
for (Map.Entry<String, List<MetricSeries>> entry : clusterMetrics.entrySet()) {
String cluster = entry.getKey();
double clusterAvg = entry.getValue().stream()
.flatMap(series -> series.getDataPoints().stream())
.mapToDouble(DataPoint::getValue)
.average()
.orElse(0.0);
double zScore = Math.abs((clusterAvg - globalMean) / globalStdDev);
if (zScore > threshold) {
anomalousClusters.add(cluster);
}
}
return anomalousClusters;
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CrossClusterComparison {
private String metricName;
private Map<String, List<MetricSeries>> clusterData;
private Map<String, Double> clusterAverages;
private Instant comparisonTime;
public CrossClusterComparison(String metricName, Map<String, List<MetricSeries>> clusterData,
Map<String, Double> clusterAverages) {
this.metricName = metricName;
this.clusterData = clusterData;
this.clusterAverages = clusterAverages;
this.comparisonTime = Instant.now();
}
public String getHighestPerformingCluster() {
return clusterAverages.entrySet().stream()
.max(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.orElse(null);
}
public String getLowestPerformingCluster() {
return clusterAverages.entrySet().stream()
.min(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.orElse(null);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AlertCorrelation {
private Map<Instant, List<String>> alertsByTime;
private Map<String, Integer> alertFrequencies;
private List<AlertGroup> correlatedGroups;
public AlertCorrelation(Map<Instant, List<String>> alertsByTime) {
this.alertsByTime = alertsByTime;
this.alertFrequencies = calculateAlertFrequencies();
this.correlatedGroups = findCorrelatedGroups();
}
private Map<String, Integer> calculateAlertFrequencies() {
return alertsByTime.values().stream()
.flatMap(List::stream)
.collect(Collectors.groupingBy(
Function.identity(),
Collectors.summingInt(v -> 1)
));
}
private List<AlertGroup> findCorrelatedGroups() {
// Implementation for finding correlated alert groups
return new ArrayList<>();
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AlertGroup {
private Set<String> alertNames;
private int occurrenceCount;
private double correlationScore;
}
Step 5: REST API Controllers
@RestController
@RequestMapping("/api/thanos")
@Slf4j
public class ThanosController {
@Autowired
private GlobalMetricsService globalMetricsService;
@Autowired
private ThanosQueryClient queryClient;
@PostMapping("/query/range")
public ResponseEntity<?> queryRange(@Valid @RequestBody RangeQueryRequest request) {
try {
List<MetricSeries> results = queryClient.queryRange(
request.getQuery(),
request.getStartTime(),
request.getEndTime(),
request.getStep()
);
return ResponseEntity.ok(results);
} catch (Exception e) {
log.error("Range query failed", e);
return ResponseEntity.badRequest().body(new ErrorResponse("QUERY_RANGE_ERROR", e.getMessage()));
}
}
@PostMapping("/query/instant")
public ResponseEntity<?> queryInstant(@Valid @RequestBody InstantQueryRequest request) {
try {
List<InstantVector> results = queryClient.queryInstant(
request.getQuery(),
request.getTime()
);
return ResponseEntity.ok(results);
} catch (Exception e) {
log.error("Instant query failed", e);
return ResponseEntity.badRequest().body(new ErrorResponse("QUERY_INSTANT_ERROR", e.getMessage()));
}
}
@GetMapping("/global/view")
public ResponseEntity<?> getGlobalView(
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant start,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant end) {
try {
if (start == null) start = Instant.now().minus(Duration.ofHours(1));
if (end == null) end = Instant.now();
GlobalViewStats stats = globalMetricsService.getGlobalView(start, end);
return ResponseEntity.ok(stats);
} catch (Exception e) {
log.error("Failed to get global view", e);
return ResponseEntity.badRequest().body(new ErrorResponse("GLOBAL_VIEW_ERROR", e.getMessage()));
}
}
@GetMapping("/metrics/cross-cluster/{metricName}")
public ResponseEntity<?> getCrossClusterMetrics(
@PathVariable String metricName,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant start,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant end,
@RequestParam(defaultValue = "5m") Duration step) {
try {
if (start == null) start = Instant.now().minus(Duration.ofHours(1));
if (end == null) end = Instant.now();
Map<String, List<MetricSeries>> results = globalMetricsService.getCrossClusterMetrics(
metricName, start, end, step);
return ResponseEntity.ok(results);
} catch (Exception e) {
log.error("Failed to get cross-cluster metrics", e);
return ResponseEntity.badRequest().body(new ErrorResponse("CROSS_CLUSTER_ERROR", e.getMessage()));
}
}
@GetMapping("/clusters/compare/{metricName}")
public ResponseEntity<?> compareClusters(
@PathVariable String metricName,
@RequestParam List<String> clusters,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant start,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant end) {
try {
if (start == null) start = Instant.now().minus(Duration.ofHours(1));
if (end == null) end = Instant.now();
CrossClusterComparison comparison = globalMetricsService.compareClusters(
metricName, clusters, start, end);
return ResponseEntity.ok(comparison);
} catch (Exception e) {
log.error("Failed to compare clusters", e);
return ResponseEntity.badRequest().body(new ErrorResponse("CLUSTER_COMPARE_ERROR", e.getMessage()));
}
}
@GetMapping("/alerts/correlations")
public ResponseEntity<?> getAlertCorrelations(
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant start,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant end) {
try {
if (start == null) start = Instant.now().minus(Duration.ofHours(24));
if (end == null) end = Instant.now();
AlertCorrelation correlations = globalMetricsService.findCorrelatedAlerts(start, end);
return ResponseEntity.ok(correlations);
} catch (Exception e) {
log.error("Failed to get alert correlations", e);
return ResponseEntity.badRequest().body(new ErrorResponse("ALERT_CORRELATION_ERROR", e.getMessage()));
}
}
@GetMapping("/clusters/anomalies/{metricName}")
public ResponseEntity<?> findAnomalousClusters(
@PathVariable String metricName,
@RequestParam(defaultValue = "2.0") double threshold,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant start,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant end) {
try {
if (start == null) start = Instant.now().minus(Duration.ofHours(1));
if (end == null) end = Instant.now();
List<String> anomalies = globalMetricsService.findAnomalousClusters(
metricName, threshold, start, end);
return ResponseEntity.ok(anomalies);
} catch (Exception e) {
log.error("Failed to find anomalous clusters", e);
return ResponseEntity.badRequest().body(new ErrorResponse("ANOMALY_DETECTION_ERROR", e.getMessage()));
}
}
}
@Data
class RangeQueryRequest {
@NotBlank
private String query;
@NotNull
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private Instant startTime;
@NotNull
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private Instant endTime;
@NotNull
private Duration step;
}
@Data
class InstantQueryRequest {
@NotBlank
private String query;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private Instant time;
}
@Data
@AllArgsConstructor
class ErrorResponse {
private String errorCode;
private String message;
private LocalDateTime timestamp;
public ErrorResponse(String errorCode, String message) {
this.errorCode = errorCode;
this.message = message;
this.timestamp = LocalDateTime.now();
}
}
Step 6: Exception Handling
public class ThanosException extends RuntimeException {
public ThanosException(String message) {
super(message);
}
public ThanosException(String message, Throwable cause) {
super(message, cause);
}
}
@ControllerAdvice
public class ThanosExceptionHandler {
@ExceptionHandler(ThanosException.class)
public ResponseEntity<ErrorResponse> handleThanosException(ThanosException ex) {
ErrorResponse error = new ErrorResponse("THANOS_ERROR", ex.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error);
}
@ExceptionHandler(ConstraintViolationException.class)
public ResponseEntity<ErrorResponse> handleValidationException(ConstraintViolationException ex) {
String message = ex.getConstraintViolations().stream()
.map(ConstraintViolation::getMessage)
.collect(Collectors.joining(", "));
ErrorResponse error = new ErrorResponse("VALIDATION_ERROR", message);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(error);
}
}
Step 7: Configuration File
application.yml:
thanos:
query-endpoints:
- "http://thanos-query-1:10904"
- "http://thanos-query-2:10904"
store-endpoints:
- "thanos-store-1:10901"
- "thanos-store-2:10901"
query-api-path: "/api/v1"
query-timeout: 30000
max-samples: 50000000
enable-deduplication: true
enable-partial-response: true
replica-label: "replica"
store-timeout: 30000
store-max-concurrent: 20
enable-query-cache: true
query-cache-duration: 300
query-cache-size: 1000
bearer-token: "${THANOS_BEARER_TOKEN:}"
tls-enabled: false
enable-metrics: true
metrics-port: 9090
spring:
cache:
type: caffeine
logging:
level:
com.yourcompany.thanos: DEBUG
Step 8: Usage Examples
@Service
@Slf4j
public class ThanosExampleService {
@Autowired
private GlobalMetricsService globalMetricsService;
@Scheduled(fixedRate = 60000) // Every minute
public void monitorGlobalHealth() {
try {
GlobalViewStats stats = globalMetricsService.getGlobalView(
Instant.now().minus(Duration.ofMinutes(5)),
Instant.now()
);
log.info("Global view - Clusters: {}/{}, Samples: {}",
stats.getActiveClusters(), stats.getTotalClusters(), stats.getTotalSamples());
// Check for unhealthy clusters
stats.getClusterStats().forEach((cluster, clusterStats) -> {
if (!clusterStats.isReachable()) {
log.warn("Cluster {} is unreachable", cluster);
} else if (clusterStats.getDataLatency().toMinutes() > 5) {
log.warn("Cluster {} has high data latency: {}",
cluster, clusterStats.getDataLatency());
}
});
} catch (Exception e) {
log.error("Failed to monitor global health", e);
}
}
public void analyzePerformanceAcrossClusters() {
Instant end = Instant.now();
Instant start = end.minus(Duration.ofHours(1));
// Compare CPU usage across clusters
CrossClusterComparison comparison = globalMetricsService.compareClusters(
"node_cpu_seconds_total",
Arrays.asList("cluster-1", "cluster-2", "cluster-3"),
start, end
);
log.info("Highest performing cluster: {}", comparison.getHighestPerformingCluster());
log.info("Lowest performing cluster: {}", comparison.getLowestPerformingCluster());
}
}
Key Features Implemented
- Global Querying: Query metrics across all Prometheus instances
- Cross-Cluster Analysis: Compare metrics across different clusters
- Alert Correlation: Find correlated alerts across the global deployment
- Anomaly Detection: Identify anomalous clusters using statistical methods
- StoreAPI Integration: Direct gRPC access to Thanos Store nodes
- Caching: Query result caching for performance
- Health Monitoring: Global view of cluster health and data latency
Best Practices
- Load Balancing: Distribute queries across multiple Thanos Query instances
- Caching Strategy: Implement appropriate cache durations for different query types
- Error Handling: Graceful degradation when stores are unavailable
- Monitoring: Monitor Thanos client performance and error rates
- Security: Use TLS and authentication for production deployments
- Resource Management: Proper cleanup of gRPC channels and HTTP clients
- Query Optimization: Use appropriate step sizes and time ranges
This comprehensive Thanos integration provides a robust foundation for building global observability solutions in Java, enabling unified querying, cross-cluster analysis, and comprehensive monitoring of distributed systems.
Java Observability, Logging Intelligence & AI-Driven Monitoring (APM, Tracing, Logs & Anomaly Detection)
https://macronepal.com/blog/beyond-metrics-observing-serverless-and-traditional-java-applications-with-thundra-apm/
Explains using Thundra APM to observe both serverless and traditional Java applications by combining tracing, metrics, and logs into a unified observability platform for faster debugging and performance insights.
https://macronepal.com/blog/dynatrace-oneagent-in-java-2/
Explains Dynatrace OneAgent for Java, which automatically instruments JVM applications to capture metrics, traces, and logs, enabling full-stack monitoring and root-cause analysis with minimal configuration.
https://macronepal.com/blog/lightstep-java-sdk-distributed-tracing-and-observability-implementation/
Explains Lightstep Java SDK for distributed tracing, helping developers track requests across microservices and identify latency issues using OpenTelemetry-based observability.
https://macronepal.com/blog/honeycomb-io-beeline-for-java-complete-guide-2/
Explains Honeycomb Beeline for Java, which provides high-cardinality observability and deep query capabilities to understand complex system behavior and debug distributed systems efficiently.
https://macronepal.com/blog/lumigo-for-serverless-in-java-complete-distributed-tracing-guide-2/
Explains Lumigo for Java serverless applications, offering automatic distributed tracing, log correlation, and error tracking to simplify debugging in cloud-native environments. (Lumigo Docs)
https://macronepal.com/blog/from-noise-to-signals-implementing-log-anomaly-detection-in-java-applications/
Explains how to detect anomalies in Java logs using behavioral patterns and machine learning techniques to separate meaningful incidents from noisy log data and improve incident response.
https://macronepal.com/blog/ai-powered-log-analysis-in-java-from-reactive-debugging-to-proactive-insights/
Explains AI-driven log analysis for Java applications, shifting from manual debugging to predictive insights that identify issues early and improve system reliability using intelligent log processing.
https://macronepal.com/blog/titliel-java-logging-best-practices/
Explains best practices for Java logging, focusing on structured logs, proper log levels, performance optimization, and ensuring logs are useful for debugging and observability systems.
https://macronepal.com/blog/seeking-a-loguru-for-java-the-quest-for-elegant-and-simple-logging/
Explains the search for simpler, more elegant logging frameworks in Java, comparing modern logging approaches that aim to reduce complexity while improving readability and developer experience.