InfluxDB is a high-performance time series database designed for handling high write and query loads. This guide demonstrates how to integrate InfluxDB with Java applications for storing, querying, and analyzing time series data.
Why Use InfluxDB for Time Series?
- High Performance: Optimized for time-stamped data
- SQL-like Query Language: InfluxQL and Flux for complex queries
- Continuous Queries: Automatic downsampling and aggregation
- Retention Policies: Automatic data expiration
- High Availability: Clustering support for enterprise needs
- Rich Ecosystem: Integrations with Grafana, Telegraf, and more
Prerequisites
- InfluxDB 2.x+ instance running
- Java 8+ with HTTP client capabilities
- Maven/Gradle for dependency management
Step 1: Project Dependencies
Maven (pom.xml):
<dependencies> <!-- InfluxDB Java Client --> <dependency> <groupId>com.influxdb</groupId> <artifactId>influxdb-client-java</artifactId> <version>6.9.0</version> </dependency> <!-- Alternatively, for InfluxDB 1.x --> <dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.23</version> </dependency> <!-- HTTP Client --> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.14</version> </dependency> <!-- JSON Processing --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.15.2</version> </dependency> <!-- Spring Boot (Optional) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.7.0</version> </dependency> <!-- Cache --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-cache</artifactId> <version>2.7.0</version> </dependency> <!-- Metrics (Optional) --> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-core</artifactId> <version>1.10.5</version> </dependency> <!-- Apache Commons --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-math3</artifactId> <version>3.6.1</version> </dependency> </dependencies>
Step 2: Configuration Classes
@Configuration
@ConfigurationProperties(prefix = "influxdb")
@Data
public class InfluxDBConfig {
// InfluxDB 2.x configuration
private String url = "http://localhost:8086";
private String token;
private String org;
private String bucket;
private String username; // For InfluxDB 1.x
private String password; // For InfluxDB 1.x
private String database; // For InfluxDB 1.x
// Connection settings
private int connectTimeout = 10000;
private int readTimeout = 30000;
private int writeTimeout = 10000;
private int maxRetries = 3;
private int batchSize = 1000;
private int flushInterval = 1000;
private boolean gzipEnabled = true;
// Query settings
private int queryChunkSize = 10000;
private String queryLanguage = "flux"; // "flux" or "influxql"
public boolean isV1() {
return database != null && !database.isEmpty();
}
}
@Component
@Slf4j
public class InfluxDBClientFactory {
private final InfluxDBConfig config;
public InfluxDBClientFactory(InfluxDBConfig config) {
this.config = config;
}
public InfluxDBClient createClient() {
try {
InfluxDBClientOptions.Builder optionsBuilder = InfluxDBClientOptions.builder()
.url(config.getUrl())
.authenticateToken(config.getToken().toCharArray())
.org(config.getOrg())
.bucket(config.getBucket())
.timeout(config.getWriteTimeout());
if (config.isGzipEnabled()) {
optionsBuilder.enableGzip();
}
InfluxDBClient client = InfluxDBClientFactory.create(optionsBuilder.build());
log.info("InfluxDB 2.x client created successfully for URL: {}", config.getUrl());
return client;
} catch (Exception e) {
throw new RuntimeException("Failed to create InfluxDB client", e);
}
}
@Deprecated
public org.influxdb.InfluxDB createV1Client() {
try {
org.influxdb.InfluxDB influxDB = org.influxdb.InfluxDBFactory.connect(
config.getUrl(), config.getUsername(), config.getPassword());
influxDB.setDatabase(config.getDatabase());
influxDB.setRetentionPolicy("autogen");
influxDB.enableBatch(config.getBatchSize(), config.getFlushInterval(), TimeUnit.MILLISECONDS);
log.info("InfluxDB 1.x client created successfully for database: {}", config.getDatabase());
return influxDB;
} catch (Exception e) {
throw new RuntimeException("Failed to create InfluxDB 1.x client", e);
}
}
public void closeClient(InfluxDBClient client) {
if (client != null) {
client.close();
}
}
}
Step 3: Core Data Models
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TimeSeriesPoint {
private String measurement;
private Map<String, String> tags;
private Map<String, Object> fields;
private Instant timestamp;
private Long nanosecondPrecision;
public TimeSeriesPoint(String measurement, Map<String, String> tags,
Map<String, Object> fields, Instant timestamp) {
this.measurement = measurement;
this.tags = tags != null ? tags : new HashMap<>();
this.fields = fields != null ? fields : new HashMap<>();
this.timestamp = timestamp != null ? timestamp : Instant.now();
}
public void addTag(String key, String value) {
if (tags == null) {
tags = new HashMap<>();
}
tags.put(key, value);
}
public void addField(String key, Object value) {
if (fields == null) {
fields = new HashMap<>();
}
fields.put(key, value);
}
public Point toInfluxPoint() {
Point.Builder pointBuilder = Point.measurement(measurement)
.time(timestamp.toEpochMilli(), TimeUnit.MILLISECONDS);
if (tags != null) {
tags.forEach(pointBuilder::addTag);
}
if (fields != null) {
fields.forEach((key, value) -> {
if (value instanceof Number) {
pointBuilder.addField(key, (Number) value);
} else if (value instanceof String) {
pointBuilder.addField(key, (String) value);
} else if (value instanceof Boolean) {
pointBuilder.addField(key, (Boolean) value);
}
});
}
return pointBuilder.build();
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class QueryRequest {
private String query;
private String range; // e.g., "1h", "24h", "7d"
private Instant startTime;
private Instant endTime;
private Map<String, String> filters;
private Integer limit;
private String aggregation; // "mean", "sum", "count", "max", "min"
private String window; // for grouping, e.g., "1m", "5m", "1h"
public boolean hasTimeRange() {
return startTime != null && endTime != null;
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class QueryResult {
private boolean success;
private List<TimeSeriesPoint> data;
private String errorMessage;
private long executionTime;
private Map<String, Object> statistics;
public static QueryResult success(List<TimeSeriesPoint> data) {
return QueryResult.builder()
.success(true)
.data(data)
.build();
}
public static QueryResult error(String errorMessage) {
return QueryResult.builder()
.success(false)
.errorMessage(errorMessage)
.data(Collections.emptyList())
.build();
}
}
@Data
public class TimeSeriesStatistics {
private String measurement;
private long totalPoints;
private Instant firstTimestamp;
private Instant lastTimestamp;
private Map<String, Object> fieldStats; // min, max, mean, stddev for numeric fields
private Map<String, Integer> tagCardinality;
public TimeSeriesStatistics(String measurement) {
this.measurement = measurement;
this.fieldStats = new HashMap<>();
this.tagCardinality = new HashMap<>();
}
}
Step 4: Core Service Classes
Write Service
@Service
@Slf4j
public class InfluxWriteService {
private final InfluxDBConfig config;
private final InfluxDBClient influxClient;
private final WriteApi writeApi;
public InfluxWriteService(InfluxDBConfig config, InfluxDBClientFactory clientFactory) {
this.config = config;
this.influxClient = clientFactory.createClient();
this.writeApi = influxClient.getWriteApi();
// Set up write error handling
setupErrorHandling();
}
public void writePoint(TimeSeriesPoint point) {
try {
Point influxPoint = point.toInfluxPoint();
writeApi.writePoint(influxPoint);
log.debug("Written point to measurement: {}", point.getMeasurement());
} catch (Exception e) {
log.error("Failed to write point to measurement: {}", point.getMeasurement(), e);
throw new InfluxDBException("Write operation failed", e);
}
}
public void writePoints(List<TimeSeriesPoint> points) {
if (points == null || points.isEmpty()) {
return;
}
try {
List<Point> influxPoints = points.stream()
.map(TimeSeriesPoint::toInfluxPoint)
.collect(Collectors.toList());
writeApi.writePoints(influxPoints);
log.debug("Written {} points to InfluxDB", points.size());
} catch (Exception e) {
log.error("Failed to write {} points to InfluxDB", points.size(), e);
throw new InfluxDBException("Batch write operation failed", e);
}
}
public void writeRecord(String record) {
try {
writeApi.writeRecord(record);
log.debug("Written record: {}", record);
} catch (Exception e) {
log.error("Failed to write record: {}", record, e);
throw new InfluxDBException("Record write operation failed", e);
}
}
public void writeRecords(List<String> records) {
if (records == null || records.isEmpty()) {
return;
}
try {
writeApi.writeRecords(records);
log.debug("Written {} records to InfluxDB", records.size());
} catch (Exception e) {
log.error("Failed to write {} records to InfluxDB", records.size(), e);
throw new InfluxDBException("Batch record write operation failed", e);
}
}
public void writeWithRetry(TimeSeriesPoint point, int maxRetries) {
int attempt = 0;
while (attempt < maxRetries) {
try {
writePoint(point);
return;
} catch (Exception e) {
attempt++;
if (attempt >= maxRetries) {
throw e;
}
log.warn("Write attempt {} failed, retrying...", attempt);
try {
Thread.sleep(100 * attempt); // Exponential backoff
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new InfluxDBException("Write interrupted", ie);
}
}
}
}
private void setupErrorHandling() {
writeApi.setWriteFailedCallback((points, throwable, s) -> {
log.error("Write failed for {} points: {}", points.size(), throwable.getMessage());
// Additional error handling logic here
});
writeApi.setWriteSucceededCallback((points, s) -> {
log.debug("Successfully wrote {} points", points.size());
});
}
@PreDestroy
public void cleanup() {
if (writeApi != null) {
writeApi.close();
}
if (influxClient != null) {
influxClient.close();
}
}
}
Query Service
@Service
@Slf4j
public class InfluxQueryService {
private final InfluxDBConfig config;
private final InfluxDBClient influxClient;
private final QueryApi queryApi;
public InfluxQueryService(InfluxDBConfig config, InfluxDBClientFactory clientFactory) {
this.config = config;
this.influxClient = clientFactory.createClient();
this.queryApi = influxClient.getQueryApi();
}
public QueryResult executeQuery(QueryRequest request) {
long startTime = System.currentTimeMillis();
try {
String query = buildQuery(request);
log.debug("Executing query: {}", query);
List<FluxTable> tables = queryApi.query(query);
List<TimeSeriesPoint> results = parseFluxTables(tables);
long executionTime = System.currentTimeMillis() - startTime;
return QueryResult.builder()
.success(true)
.data(results)
.executionTime(executionTime)
.statistics(collectQueryStats(results))
.build();
} catch (Exception e) {
log.error("Query execution failed", e);
long executionTime = System.currentTimeMillis() - startTime;
return QueryResult.builder()
.success(false)
.errorMessage(e.getMessage())
.executionTime(executionTime)
.data(Collections.emptyList())
.build();
}
}
public List<TimeSeriesPoint> queryRaw(String fluxQuery) {
try {
List<FluxTable> tables = queryApi.query(fluxQuery);
return parseFluxTables(tables);
} catch (Exception e) {
log.error("Raw query execution failed", e);
throw new InfluxDBException("Query execution failed", e);
}
}
public TimeSeriesStatistics getMeasurementStats(String measurement, Instant start, Instant end) {
String query = String.format(
"from(bucket: \"%s\") " +
"|> range(start: %s, stop: %s) " +
"|> filter(fn: (r) => r._measurement == \"%s\") " +
"|> group() " +
"|> count()",
config.getBucket(),
start != null ? start.toString() : "-30d",
end != null ? end.toString() : "now()",
measurement
);
List<TimeSeriesPoint> results = queryRaw(query);
TimeSeriesStatistics stats = new TimeSeriesStatistics(measurement);
if (!results.isEmpty()) {
// Parse statistics from results
// This is a simplified implementation
stats.setTotalPoints(results.size());
}
return stats;
}
public Map<String, List<TimeSeriesPoint>> getFieldValues(String measurement, String field,
Instant start, Instant end,
String window) {
String query = String.format(
"from(bucket: \"%s\") " +
"|> range(start: %s, stop: %s) " +
"|> filter(fn: (r) => r._measurement == \"%s\" and r._field == \"%s\") " +
"|> aggregateWindow(every: %s, fn: mean, createEmpty: false) " +
"|> yield(name: \"mean\")",
config.getBucket(),
start != null ? start.toString() : "-1h",
end != null ? end.toString() : "now()",
measurement, field, window != null ? window : "1m"
);
List<TimeSeriesPoint> results = queryRaw(query);
// Group by tags if needed
return results.stream()
.collect(Collectors.groupingBy(point ->
point.getTags() != null ? point.getTags().toString() : "default"));
}
private String buildQuery(QueryRequest request) {
if (request.getQuery() != null) {
return request.getQuery(); // Use raw query if provided
}
StringBuilder fluxBuilder = new StringBuilder();
fluxBuilder.append(String.format("from(bucket: \"%s\")", config.getBucket()));
// Add time range
if (request.hasTimeRange()) {
fluxBuilder.append(String.format("|> range(start: %s, stop: %s)",
request.getStartTime(), request.getEndTime()));
} else if (request.getRange() != null) {
fluxBuilder.append(String.format("|> range(start: -%s)", request.getRange()));
} else {
fluxBuilder.append("|> range(start: -1h)");
}
// Add filters
if (request.getFilters() != null && !request.getFilters().isEmpty()) {
request.getFilters().forEach((key, value) -> {
fluxBuilder.append(String.format("|> filter(fn: (r) => r.%s == \"%s\")", key, value));
});
}
// Add aggregation if specified
if (request.getAggregation() != null && request.getWindow() != null) {
fluxBuilder.append(String.format("|> aggregateWindow(every: %s, fn: %s, createEmpty: false)",
request.getWindow(), request.getAggregation()));
}
// Add limit if specified
if (request.getLimit() != null) {
fluxBuilder.append(String.format("|> limit(n: %d)", request.getLimit()));
}
return fluxBuilder.toString();
}
private List<TimeSeriesPoint> parseFluxTables(List<FluxTable> tables) {
List<TimeSeriesPoint> points = new ArrayList<>();
for (FluxTable table : tables) {
List<FluxRecord> records = table.getRecords();
for (FluxRecord record : records) {
TimeSeriesPoint point = new TimeSeriesPoint();
point.setMeasurement(record.getMeasurement());
point.setTimestamp(record.getTime());
// Parse tags
Map<String, String> tags = new HashMap<>();
record.getValues().forEach((key, value) -> {
if (key.startsWith("_") && !key.equals("_field") && !key.equals("_value") && !key.equals("_time")) {
tags.put(key.substring(1), value != null ? value.toString() : "");
}
});
point.setTags(tags);
// Parse fields
Map<String, Object> fields = new HashMap<>();
String fieldName = record.getField();
Object fieldValue = record.getValue();
if (fieldName != null && fieldValue != null) {
fields.put(fieldName, fieldValue);
}
point.setFields(fields);
points.add(point);
}
}
return points;
}
private Map<String, Object> collectQueryStats(List<TimeSeriesPoint> results) {
Map<String, Object> stats = new HashMap<>();
stats.put("resultCount", results.size());
if (!results.isEmpty()) {
// Calculate time range
Instant minTime = results.stream()
.map(TimeSeriesPoint::getTimestamp)
.min(Instant::compareTo)
.orElse(Instant.now());
Instant maxTime = results.stream()
.map(TimeSeriesPoint::getTimestamp)
.max(Instant::compareTo)
.orElse(Instant.now());
stats.put("timeRange", Duration.between(minTime, maxTime).toString());
stats.put("minTimestamp", minTime);
stats.put("maxTimestamp", maxTime);
}
return stats;
}
}
Analytics Service
@Service
@Slf4j
public class TimeSeriesAnalyticsService {
private final InfluxQueryService queryService;
public TimeSeriesAnalyticsService(InfluxQueryService queryService) {
this.queryService = queryService;
}
public Map<String, Object> calculateBasicStats(String measurement, String field,
Instant start, Instant end) {
List<TimeSeriesPoint> data = queryService.getFieldValues(measurement, field, start, end, null)
.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
if (data.isEmpty()) {
return Collections.emptyMap();
}
// Extract numeric values
List<Double> values = data.stream()
.map(point -> point.getFields().get(field))
.filter(value -> value instanceof Number)
.map(value -> ((Number) value).doubleValue())
.collect(Collectors.toList());
if (values.isEmpty()) {
return Collections.emptyMap();
}
// Calculate statistics using Apache Commons Math
DescriptiveStatistics stats = new DescriptiveStatistics();
values.forEach(stats::addValue);
Map<String, Object> result = new HashMap<>();
result.put("count", stats.getN());
result.put("mean", stats.getMean());
result.put("stdDev", stats.getStandardDeviation());
result.put("min", stats.getMin());
result.put("max", stats.getMax());
result.put("median", stats.getPercentile(50));
result.put("q1", stats.getPercentile(25));
result.put("q3", stats.getPercentile(75));
return result;
}
public List<TimeSeriesPoint> detectAnomalies(String measurement, String field,
Instant start, Instant end,
double threshold) {
List<TimeSeriesPoint> data = queryService.getFieldValues(measurement, field, start, end, "5m")
.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
if (data.isEmpty()) {
return Collections.emptyList();
}
// Simple anomaly detection using Z-score
List<Double> values = data.stream()
.map(point -> point.getFields().get(field))
.filter(value -> value instanceof Number)
.map(value -> ((Number) value).doubleValue())
.collect(Collectors.toList());
DescriptiveStatistics stats = new DescriptiveStatistics();
values.forEach(stats::addValue);
double mean = stats.getMean();
double stdDev = stats.getStandardDeviation();
List<TimeSeriesPoint> anomalies = new ArrayList<>();
for (int i = 0; i < data.size(); i++) {
double value = values.get(i);
double zScore = Math.abs((value - mean) / stdDev);
if (zScore > threshold) {
TimeSeriesPoint anomaly = data.get(i);
anomaly.addField("z_score", zScore);
anomaly.addField("anomaly_score", zScore);
anomalies.add(anomaly);
}
}
return anomalies;
}
public Map<String, Object> forecast(String measurement, String field,
Instant start, Instant end, int periods) {
// Simple linear regression forecasting
List<TimeSeriesPoint> data = queryService.getFieldValues(measurement, field, start, end, "1h")
.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
if (data.size() < 2) {
throw new IllegalArgumentException("Insufficient data for forecasting");
}
// Prepare data for regression
double[] x = new double[data.size()];
double[] y = new double[data.size()];
for (int i = 0; i < data.size(); i++) {
x[i] = i; // Time index
Object value = data.get(i).getFields().get(field);
y[i] = value instanceof Number ? ((Number) value).doubleValue() : 0;
}
// Perform linear regression
SimpleRegression regression = new SimpleRegression();
for (int i = 0; i < x.length; i++) {
regression.addData(x[i], y[i]);
}
// Generate forecast
List<TimeSeriesPoint> forecast = new ArrayList<>();
for (int i = 0; i < periods; i++) {
double forecastValue = regression.predict(x.length + i);
TimeSeriesPoint point = new TimeSeriesPoint();
point.setMeasurement(measurement + "_forecast");
point.addField(field, forecastValue);
point.addField("confidence", regression.getR());
point.setTimestamp(Instant.now().plusSeconds(i * 3600)); // Hourly intervals
forecast.add(point);
}
Map<String, Object> result = new HashMap<>();
result.put("forecast", forecast);
result.put("rSquared", regression.getRSquare());
result.put("slope", regression.getSlope());
result.put("intercept", regression.getIntercept());
return result;
}
}
Step 5: REST API Controllers
@RestController
@RequestMapping("/api/timeseries")
@Slf4j
public class TimeSeriesController {
@Autowired
private InfluxWriteService writeService;
@Autowired
private InfluxQueryService queryService;
@Autowired
private TimeSeriesAnalyticsService analyticsService;
@PostMapping("/write")
public ResponseEntity<?> writeDataPoint(@Valid @RequestBody TimeSeriesPoint point) {
try {
writeService.writePoint(point);
return ResponseEntity.ok("Data point written successfully");
} catch (Exception e) {
log.error("Failed to write data point", e);
return ResponseEntity.badRequest().body(new ErrorResponse("WRITE_ERROR", e.getMessage()));
}
}
@PostMapping("/write/batch")
public ResponseEntity<?> writeDataPoints(@Valid @RequestBody List<TimeSeriesPoint> points) {
try {
writeService.writePoints(points);
return ResponseEntity.ok(String.format("%d data points written successfully", points.size()));
} catch (Exception e) {
log.error("Failed to write data points", e);
return ResponseEntity.badRequest().body(new ErrorResponse("BATCH_WRITE_ERROR", e.getMessage()));
}
}
@PostMapping("/query")
public ResponseEntity<?> executeQuery(@Valid @RequestBody QueryRequest request) {
try {
QueryResult result = queryService.executeQuery(request);
return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("Query execution failed", e);
return ResponseEntity.badRequest().body(new ErrorResponse("QUERY_ERROR", e.getMessage()));
}
}
@GetMapping("/measurements/{measurement}/stats")
public ResponseEntity<?> getMeasurementStats(
@PathVariable String measurement,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant start,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant end) {
try {
TimeSeriesStatistics stats = queryService.getMeasurementStats(measurement, start, end);
return ResponseEntity.ok(stats);
} catch (Exception e) {
log.error("Failed to get measurement statistics", e);
return ResponseEntity.badRequest().body(new ErrorResponse("STATS_ERROR", e.getMessage()));
}
}
@GetMapping("/analytics/stats/{measurement}/{field}")
public ResponseEntity<?> getFieldStatistics(
@PathVariable String measurement,
@PathVariable String field,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant start,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant end) {
try {
Map<String, Object> stats = analyticsService.calculateBasicStats(measurement, field, start, end);
return ResponseEntity.ok(stats);
} catch (Exception e) {
log.error("Failed to calculate field statistics", e);
return ResponseEntity.badRequest().body(new ErrorResponse("ANALYTICS_ERROR", e.getMessage()));
}
}
@GetMapping("/analytics/anomalies/{measurement}/{field}")
public ResponseEntity<?> detectAnomalies(
@PathVariable String measurement,
@PathVariable String field,
@RequestParam(defaultValue = "3.0") double threshold,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant start,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant end) {
try {
List<TimeSeriesPoint> anomalies = analyticsService.detectAnomalies(
measurement, field, start, end, threshold);
return ResponseEntity.ok(anomalies);
} catch (Exception e) {
log.error("Failed to detect anomalies", e);
return ResponseEntity.badRequest().body(new ErrorResponse("ANOMALY_DETECTION_ERROR", e.getMessage()));
}
}
@GetMapping("/analytics/forecast/{measurement}/{field}")
public ResponseEntity<?> generateForecast(
@PathVariable String measurement,
@PathVariable String field,
@RequestParam(defaultValue = "10") int periods,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant start,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) Instant end) {
try {
Map<String, Object> forecast = analyticsService.forecast(
measurement, field, start, end, periods);
return ResponseEntity.ok(forecast);
} catch (Exception e) {
log.error("Failed to generate forecast", e);
return ResponseEntity.badRequest().body(new ErrorResponse("FORECAST_ERROR", e.getMessage()));
}
}
}
@Data
@AllArgsConstructor
class ErrorResponse {
private String errorCode;
private String message;
private LocalDateTime timestamp;
public ErrorResponse(String errorCode, String message) {
this.errorCode = errorCode;
this.message = message;
this.timestamp = LocalDateTime.now();
}
}
Step 6: Exception Handling
public class InfluxDBException extends RuntimeException {
public InfluxDBException(String message) {
super(message);
}
public InfluxDBException(String message, Throwable cause) {
super(message, cause);
}
}
@ControllerAdvice
public class InfluxDBExceptionHandler {
@ExceptionHandler(InfluxDBException.class)
public ResponseEntity<ErrorResponse> handleInfluxDBException(InfluxDBException ex) {
ErrorResponse error = new ErrorResponse("INFLUXDB_ERROR", ex.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error);
}
@ExceptionHandler(ConstraintViolationException.class)
public ResponseEntity<ErrorResponse> handleValidationException(ConstraintViolationException ex) {
String message = ex.getConstraintViolations().stream()
.map(ConstraintViolation::getMessage)
.collect(Collectors.joining(", "));
ErrorResponse error = new ErrorResponse("VALIDATION_ERROR", message);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(error);
}
}
Step 7: Configuration File
application.yml:
influxdb:
url: "http://localhost:8086"
token: "${INFLUXDB_TOKEN:your-token-here}"
org: "your-org"
bucket: "your-bucket"
username: "${INFLUXDB_USERNAME:admin}" # For InfluxDB 1.x
password: "${INFLUXDB_PASSWORD:password}" # For InfluxDB 1.x
database: "your-database" # For InfluxDB 1.x
connect-timeout: 10000
read-timeout: 30000
write-timeout: 10000
max-retries: 3
batch-size: 1000
flush-interval: 1000
gzip-enabled: true
query-chunk-size: 10000
query-language: "flux"
spring:
cache:
type: caffeine
logging:
level:
com.yourcompany.influxdb: DEBUG
Step 8: Usage Examples
@Service
@Slf4j
public class TimeSeriesExampleService {
@Autowired
private InfluxWriteService writeService;
@Autowired
private InfluxQueryService queryService;
@Scheduled(fixedRate = 60000) // Every minute
public void collectSystemMetrics() {
// Collect and write system metrics
TimeSeriesPoint cpuPoint = TimeSeriesPoint.builder()
.measurement("system_metrics")
.addTag("host", "server-1")
.addTag("environment", "production")
.addField("cpu_usage", getCpuUsage())
.addField("memory_usage", getMemoryUsage())
.addField("disk_usage", getDiskUsage())
.timestamp(Instant.now())
.build();
writeService.writePoint(cpuPoint);
}
public void analyzePerformance() {
// Query recent performance data
QueryRequest request = QueryRequest.builder()
.range("1h")
.filters(Map.of("host", "server-1"))
.aggregation("mean")
.window("5m")
.build();
QueryResult result = queryService.executeQuery(request);
if (result.isSuccess()) {
log.info("Retrieved {} data points", result.getData().size());
// Process the results
}
}
private double getCpuUsage() {
// Implementation to get CPU usage
return Math.random() * 100;
}
private double getMemoryUsage() {
// Implementation to get memory usage
return Math.random() * 100;
}
private double getDiskUsage() {
// Implementation to get disk usage
return Math.random() * 100;
}
}
Key Features Implemented
- Data Writing: Single and batch point writing with error handling
- Query Execution: Flexible query building and execution with Flux/InfluxQL
- Analytics: Statistical analysis, anomaly detection, and forecasting
- Error Handling: Comprehensive exception handling and retry mechanisms
- Performance: Batch writing and connection pooling
- Monitoring: Query statistics and performance metrics
Best Practices
- Batch Writing: Use batch operations for high-volume data
- Tag Design: Use tags for efficient querying and indexing
- Retention Policies: Configure appropriate data retention
- Error Handling: Implement robust error handling and retry logic
- Monitoring: Monitor write and query performance
- Security: Secure InfluxDB tokens and credentials
- Backup: Regular backups of important time series data
This comprehensive InfluxDB integration provides a robust foundation for building time series applications in Java, enabling efficient data storage, complex queries, and advanced analytics for time-stamped data.
Java Observability, Logging Intelligence & AI-Driven Monitoring (APM, Tracing, Logs & Anomaly Detection)
https://macronepal.com/blog/beyond-metrics-observing-serverless-and-traditional-java-applications-with-thundra-apm/
Explains using Thundra APM to observe both serverless and traditional Java applications by combining tracing, metrics, and logs into a unified observability platform for faster debugging and performance insights.
https://macronepal.com/blog/dynatrace-oneagent-in-java-2/
Explains Dynatrace OneAgent for Java, which automatically instruments JVM applications to capture metrics, traces, and logs, enabling full-stack monitoring and root-cause analysis with minimal configuration.
https://macronepal.com/blog/lightstep-java-sdk-distributed-tracing-and-observability-implementation/
Explains Lightstep Java SDK for distributed tracing, helping developers track requests across microservices and identify latency issues using OpenTelemetry-based observability.
https://macronepal.com/blog/honeycomb-io-beeline-for-java-complete-guide-2/
Explains Honeycomb Beeline for Java, which provides high-cardinality observability and deep query capabilities to understand complex system behavior and debug distributed systems efficiently.
https://macronepal.com/blog/lumigo-for-serverless-in-java-complete-distributed-tracing-guide-2/
Explains Lumigo for Java serverless applications, offering automatic distributed tracing, log correlation, and error tracking to simplify debugging in cloud-native environments. (Lumigo Docs)
https://macronepal.com/blog/from-noise-to-signals-implementing-log-anomaly-detection-in-java-applications/
Explains how to detect anomalies in Java logs using behavioral patterns and machine learning techniques to separate meaningful incidents from noisy log data and improve incident response.
https://macronepal.com/blog/ai-powered-log-analysis-in-java-from-reactive-debugging-to-proactive-insights/
Explains AI-driven log analysis for Java applications, shifting from manual debugging to predictive insights that identify issues early and improve system reliability using intelligent log processing.
https://macronepal.com/blog/titliel-java-logging-best-practices/
Explains best practices for Java logging, focusing on structured logs, proper log levels, performance optimization, and ensuring logs are useful for debugging and observability systems.
https://macronepal.com/blog/seeking-a-loguru-for-java-the-quest-for-elegant-and-simple-logging/
Explains the search for simpler, more elegant logging frameworks in Java, comparing modern logging approaches that aim to reduce complexity while improving readability and developer experience.