IoT Sensor Data Processing in Java: A Complete Guide for Real-time Data Handling

IoT (Internet of Things) sensor data processing involves collecting, analyzing, and acting upon data from connected devices. This comprehensive guide covers the entire pipeline from data ingestion to real-time analytics.


IoT Architecture Overview

Key Components:

  • Sensors/Devices: Data generators (temperature, humidity, motion, etc.)
  • Data Ingestion: Message brokers and protocols
  • Processing Engine: Real-time and batch processing
  • Storage: Time-series databases and data lakes
  • Analytics: Real-time monitoring and machine learning
  • Visualization: Dashboards and alerts

Dependencies and Setup

Maven Dependencies
<properties>
<spring-boot.version>3.1.0</spring-boot.version>
<kafka.version>3.4.0</kafka.version>
<influxdb.version>2.7.0</influxdb.version>
<mosquitto.version>1.2.5</mosquitto.version>
<jackson.version>2.15.2</jackson.version>
</properties>
<dependencies>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- Message Brokers -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${mosquitto.version}</version>
</dependency>
<!-- Databases -->
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>${influxdb.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-boot.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

Core Data Models

1. Sensor Data Models
import com.fasterxml.jackson.annotation.JsonFormat;
import java.time.Instant;
import java.util.Map;
public class SensorData {
private String deviceId;
private String sensorType;
private double value;
private String unit;
private Map<String, Object> metadata;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", timezone = "UTC")
private Instant timestamp;
private Location location;
// Constructors
public SensorData() {}
public SensorData(String deviceId, String sensorType, double value, String unit, Instant timestamp) {
this.deviceId = deviceId;
this.sensorType = sensorType;
this.value = value;
this.unit = unit;
this.timestamp = timestamp;
}
// Getters and setters
public String getDeviceId() { return deviceId; }
public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
public String getSensorType() { return sensorType; }
public void setSensorType(String sensorType) { this.sensorType = sensorType; }
public double getValue() { return value; }
public void setValue(double value) { this.value = value; }
public String getUnit() { return unit; }
public void setUnit(String unit) { this.unit = unit; }
public Map<String, Object> getMetadata() { return metadata; }
public void setMetadata(Map<String, Object> metadata) { this.metadata = metadata; }
public Instant getTimestamp() { return timestamp; }
public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }
public Location getLocation() { return location; }
public void setLocation(Location location) { this.location = location; }
@Override
public String toString() {
return String.format("SensorData{deviceId='%s', type='%s', value=%.2f, unit='%s', timestamp=%s}",
deviceId, sensorType, value, unit, timestamp);
}
}
public class Location {
private double latitude;
private double longitude;
private Double altitude;
public Location() {}
public Location(double latitude, double longitude) {
this.latitude = latitude;
this.longitude = longitude;
}
public Location(double latitude, double longitude, Double altitude) {
this.latitude = latitude;
this.longitude = longitude;
this.altitude = altitude;
}
// Getters and setters
public double getLatitude() { return latitude; }
public void setLatitude(double latitude) { this.latitude = latitude; }
public double getLongitude() { return longitude; }
public void setLongitude(double longitude) { this.longitude = longitude; }
public Double getAltitude() { return altitude; }
public void setAltitude(Double altitude) { this.altitude = altitude; }
}
public class MultiSensorData {
private String deviceId;
private Instant timestamp;
private Map<String, SensorReading> readings;
private DeviceStatus status;
private Location location;
// Getters and setters
public String getDeviceId() { return deviceId; }
public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
public Instant getTimestamp() { return timestamp; }
public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }
public Map<String, SensorReading> getReadings() { return readings; }
public void setReadings(Map<String, SensorReading> readings) { this.readings = readings; }
public DeviceStatus getStatus() { return status; }
public void setStatus(DeviceStatus status) { this.status = status; }
public Location getLocation() { return location; }
public void setLocation(Location location) { this.location = location; }
}
public class SensorReading {
private double value;
private String unit;
private Double minValue;
private Double maxValue;
private SensorQuality quality;
// Getters and setters
public double getValue() { return value; }
public void setValue(double value) { this.value = value; }
public String getUnit() { return unit; }
public void setUnit(String unit) { this.unit = unit; }
public Double getMinValue() { return minValue; }
public void setMinValue(Double minValue) { this.minValue = minValue; }
public Double getMaxValue() { return maxValue; }
public void setMaxValue(Double maxValue) { this.maxValue = maxValue; }
public SensorQuality getQuality() { return SensorQuality.GOOD; }
public void setQuality(SensorQuality quality) { this.quality = quality; }
}
public enum SensorQuality {
EXCELLENT, GOOD, FAIR, POOR, ERROR
}
public enum DeviceStatus {
ONLINE, OFFLINE, DEGRADED, MAINTENANCE
}
2. Device Management Models
public class IoTDevice {
private String deviceId;
private String deviceName;
private String deviceType;
private String manufacturer;
private String firmwareVersion;
private Location installationLocation;
private Map<String, String> specifications;
private DeviceStatus status;
private Instant lastSeen;
private Map<String, SensorConfig> sensorConfigs;
// Getters and setters
public String getDeviceId() { return deviceId; }
public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
public String getDeviceName() { return deviceName; }
public void setDeviceName(String deviceName) { this.deviceName = deviceName; }
public String getDeviceType() { return deviceType; }
public void setDeviceType(String deviceType) { this.deviceType = deviceType; }
public String getManufacturer() { return manufacturer; }
public void setManufacturer(String manufacturer) { this.manufacturer = manufacturer; }
public String getFirmwareVersion() { return firmwareVersion; }
public void setFirmwareVersion(String firmwareVersion) { this.firmwareVersion = firmwareVersion; }
public Location getInstallationLocation() { return installationLocation; }
public void setInstallationLocation(Location installationLocation) { this.installationLocation = installationLocation; }
public Map<String, String> getSpecifications() { return specifications; }
public void setSpecifications(Map<String, String> specifications) { this.specifications = specifications; }
public DeviceStatus getStatus() { return status; }
public void setStatus(DeviceStatus status) { this.status = status; }
public Instant getLastSeen() { return lastSeen; }
public void setLastSeen(Instant lastSeen) { this.lastSeen = lastSeen; }
public Map<String, SensorConfig> getSensorConfigs() { return sensorConfigs; }
public void setSensorConfigs(Map<String, SensorConfig> sensorConfigs) { this.sensorConfigs = sensorConfigs; }
}
public class SensorConfig {
private String sensorType;
private double samplingRate; // Hz
private Double minThreshold;
private Double maxThreshold;
private String unit;
private int retentionDays;
private boolean enabled;
// Getters and setters
public String getSensorType() { return sensorType; }
public void setSensorType(String sensorType) { this.sensorType = sensorType; }
public double getSamplingRate() { return samplingRate; }
public void setSamplingRate(double samplingRate) { this.samplingRate = samplingRate; }
public Double getMinThreshold() { return minThreshold; }
public void setMinThreshold(Double minThreshold) { this.minThreshold = minThreshold; }
public Double getMaxThreshold() { return maxThreshold; }
public void setMaxThreshold(Double maxThreshold) { this.maxThreshold = maxThreshold; }
public String getUnit() { return unit; }
public void setUnit(String unit) { this.unit = unit; }
public int getRetentionDays() { return retentionDays; }
public void setRetentionDays(int retentionDays) { this.retentionDays = retentionDays; }
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
}

Data Ingestion Layer

1. MQTT Client for Sensor Data
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
@Component
public class MqttSensorClient implements MqttCallback {
private MqttClient mqttClient;
private final ObjectMapper objectMapper;
private final SensorDataProcessor dataProcessor;
private final String brokerUrl;
private final String clientId;
public MqttSensorClient(ObjectMapper objectMapper, 
SensorDataProcessor dataProcessor,
@Value("${mqtt.broker.url}") String brokerUrl) {
this.objectMapper = objectMapper;
this.dataProcessor = dataProcessor;
this.brokerUrl = brokerUrl;
this.clientId = "sensor-processor-" + System.currentTimeMillis();
}
public void connect() throws MqttException {
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
options.setKeepAliveInterval(60);
mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
mqttClient.setCallback(this);
mqttClient.connect(options);
// Subscribe to sensor topics
subscribeToTopics();
}
private void subscribeToTopics() throws MqttException {
String[] topics = {
"sensors/+/temperature",
"sensors/+/humidity", 
"sensors/+/pressure",
"sensors/+/multi",
"devices/+/status"
};
int[] qos = {1, 1, 1, 1, 1}; // Quality of Service level 1
mqttClient.subscribe(topics, qos);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
try {
if (topic.contains("/multi")) {
// Handle multi-sensor data
MultiSensorData multiData = objectMapper.readValue(payload, MultiSensorData.class);
dataProcessor.processMultiSensorData(multiData);
} else if (topic.contains("/status")) {
// Handle device status
handleDeviceStatus(topic, payload);
} else {
// Handle single sensor data
SensorData sensorData = objectMapper.readValue(payload, SensorData.class);
dataProcessor.processSensorData(sensorData);
}
} catch (Exception e) {
System.err.println("Error processing MQTT message: " + e.getMessage());
// Could send to dead letter queue
}
}
private void handleDeviceStatus(String topic, String payload) throws Exception {
String deviceId = extractDeviceIdFromTopic(topic);
DeviceStatus status = objectMapper.readValue(payload, DeviceStatus.class);
dataProcessor.updateDeviceStatus(deviceId, status);
}
private String extractDeviceIdFromTopic(String topic) {
String[] parts = topic.split("/");
return parts[1]; // Assuming format: devices/{deviceId}/status
}
@Override
public void connectionLost(Throwable cause) {
System.err.println("MQTT Connection lost: " + cause.getMessage());
// Implement reconnection logic
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Not used for subscriptions
}
public void disconnect() throws MqttException {
if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.disconnect();
}
}
}
2. Kafka Consumer for High-Volume Data
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class KafkaSensorConsumer {
private final SensorDataProcessor dataProcessor;
private final ObjectMapper objectMapper;
public KafkaSensorConsumer(SensorDataProcessor dataProcessor, ObjectMapper objectMapper) {
this.dataProcessor = dataProcessor;
this.objectMapper = objectMapper;
}
@KafkaListener(topics = "${kafka.topics.sensor-data}", groupId = "${kafka.consumer.group}")
public void consumeSensorData(@Payload String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_KEY) String key) {
try {
SensorData sensorData = objectMapper.readValue(message, SensorData.class);
dataProcessor.processSensorData(sensorData);
} catch (Exception e) {
System.err.println("Error processing Kafka message: " + e.getMessage());
// Send to DLQ or error topic
}
}
@KafkaListener(topics = "${kafka.topics.batch-sensor-data}", groupId = "${kafka.consumer.group}")
public void consumeBatchSensorData(@Payload String message) {
try {
List<SensorData> batchData = objectMapper.readValue(message, 
objectMapper.getTypeFactory().constructCollectionType(List.class, SensorData.class));
dataProcessor.processBatchSensorData(batchData);
} catch (Exception e) {
System.err.println("Error processing batch sensor data: " + e.getMessage());
}
}
}

Data Processing Engine

1. Core Sensor Data Processor
import org.springframework.stereotype.Component;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class SensorDataProcessor {
private final SensorDataRepository dataRepository;
private final AlertService alertService;
private final AnalyticsService analyticsService;
private final Map<String, SensorStatistics> sensorStatistics;
private final Map<String, Instant> deviceLastSeen;
public SensorDataProcessor(SensorDataRepository dataRepository, 
AlertService alertService,
AnalyticsService analyticsService) {
this.dataRepository = dataRepository;
this.alertService = alertService;
this.analyticsService = analyticsService;
this.sensorStatistics = new ConcurrentHashMap<>();
this.deviceLastSeen = new ConcurrentHashMap<>();
}
public void processSensorData(SensorData sensorData) {
// Validate data
if (!validateSensorData(sensorData)) {
System.err.println("Invalid sensor data: " + sensorData);
return;
}
// Update device last seen
deviceLastSeen.put(sensorData.getDeviceId(), Instant.now());
// Check for anomalies
checkForAnomalies(sensorData);
// Update statistics
updateStatistics(sensorData);
// Store data
dataRepository.save(sensorData);
// Perform real-time analytics
analyticsService.processRealTimeData(sensorData);
}
public void processMultiSensorData(MultiSensorData multiData) {
// Update device status and location
updateDeviceStatus(multiData.getDeviceId(), multiData.getStatus());
// Process individual sensor readings
multiData.getReadings().forEach((sensorType, reading) -> {
SensorData sensorData = new SensorData();
sensorData.setDeviceId(multiData.getDeviceId());
sensorData.setSensorType(sensorType);
sensorData.setValue(reading.getValue());
sensorData.setUnit(reading.getUnit());
sensorData.setTimestamp(multiData.getTimestamp());
sensorData.setLocation(multiData.getLocation());
processSensorData(sensorData);
});
}
public void processBatchSensorData(List<SensorData> batchData) {
// Validate and filter data
List<SensorData> validData = batchData.stream()
.filter(this::validateSensorData)
.toList();
// Batch insert for efficiency
dataRepository.saveAll(validData);
// Batch analytics
analyticsService.processBatchData(validData);
}
private boolean validateSensorData(SensorData sensorData) {
if (sensorData.getDeviceId() == null || sensorData.getDeviceId().trim().isEmpty()) {
return false;
}
if (sensorData.getSensorType() == null || sensorData.getSensorType().trim().isEmpty()) {
return false;
}
if (sensorData.getTimestamp() == null) {
return false;
}
if (Double.isNaN(sensorData.getValue()) || Double.isInfinite(sensorData.getValue())) {
return false;
}
return true;
}
private void checkForAnomalies(SensorData sensorData) {
String sensorKey = sensorData.getDeviceId() + ":" + sensorData.getSensorType();
SensorStatistics stats = sensorStatistics.get(sensorKey);
if (stats != null && stats.getDataCount() > 10) {
double zScore = Math.abs((sensorData.getValue() - stats.getMean()) / stats.getStandardDeviation());
if (zScore > 3.0) { // 3 standard deviations
Alert alert = new Alert();
alert.setDeviceId(sensorData.getDeviceId());
alert.setSensorType(sensorData.getSensorType());
alert.setValue(sensorData.getValue());
alert.setTimestamp(sensorData.getTimestamp());
alert.setType(AlertType.ANOMALY);
alert.setMessage(String.format("Anomalous reading detected: %.2f (Z-score: %.2f)", 
sensorData.getValue(), zScore));
alertService.sendAlert(alert);
}
}
// Check threshold violations
checkThresholdViolations(sensorData);
}
private void checkThresholdViolations(SensorData sensorData) {
// This would typically check against configured thresholds for the device/sensor
// For example:
if (sensorData.getSensorType().equals("temperature")) {
if (sensorData.getValue() > 40.0) { // High temperature threshold
Alert alert = new Alert();
alert.setDeviceId(sensorData.getDeviceId());
alert.setSensorType(sensorData.getSensorType());
alert.setValue(sensorData.getValue());
alert.setTimestamp(sensorData.getTimestamp());
alert.setType(AlertType.THRESHOLD_VIOLATION);
alert.setMessage(String.format("High temperature alert: %.2f°C", sensorData.getValue()));
alertService.sendAlert(alert);
}
}
}
private void updateStatistics(SensorData sensorData) {
String sensorKey = sensorData.getDeviceId() + ":" + sensorData.getSensorType();
SensorStatistics stats = sensorStatistics.computeIfAbsent(sensorKey, 
k -> new SensorStatistics(1000)); // Keep last 1000 readings
stats.addValue(sensorData.getValue());
}
public void updateDeviceStatus(String deviceId, DeviceStatus status) {
// Update device status in database
// This could also trigger alerts for device offline/online status changes
}
public SensorStatistics getSensorStatistics(String deviceId, String sensorType) {
String sensorKey = deviceId + ":" + sensorType;
return sensorStatistics.get(sensorKey);
}
public Map<String, SensorStatistics> getAllStatistics() {
return Collections.unmodifiableMap(sensorStatistics);
}
}
2. Sensor Statistics Calculator
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import java.util.concurrent.locks.ReentrantLock;
public class SensorStatistics {
private final DescriptiveStatistics stats;
private final ReentrantLock lock;
private final int windowSize;
public SensorStatistics(int windowSize) {
this.stats = new DescriptiveStatistics(windowSize);
this.lock = new ReentrantLock();
this.windowSize = windowSize;
}
public void addValue(double value) {
lock.lock();
try {
stats.addValue(value);
} finally {
lock.unlock();
}
}
public double getMean() {
lock.lock();
try {
return stats.getMean();
} finally {
lock.unlock();
}
}
public double getStandardDeviation() {
lock.lock();
try {
return stats.getStandardDeviation();
} finally {
lock.unlock();
}
}
public double getMin() {
lock.lock();
try {
return stats.getMin();
} finally {
lock.unlock();
}
}
public double getMax() {
lock.lock();
try {
return stats.getMax();
} finally {
lock.unlock();
}
}
public double getPercentile(double percentile) {
lock.lock();
try {
return stats.getPercentile(percentile);
} finally {
lock.unlock();
}
}
public long getDataCount() {
lock.lock();
try {
return stats.getN();
} finally {
lock.unlock();
}
}
public void clear() {
lock.lock();
try {
stats.clear();
} finally {
lock.unlock();
}
}
}

Data Storage Layer

1. Time-Series Database (InfluxDB)
import com.influxdb.client.*;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import org.springframework.stereotype.Repository;
import java.time.Instant;
import java.util.List;
@Repository
public class InfluxDbSensorRepository {
private final WriteApiBlocking writeApi;
private final QueryApi queryApi;
private final String bucket;
private final String org;
public InfluxDbSensorRepository(InfluxDBClient influxDBClient,
@Value("${influxdb.bucket}") String bucket,
@Value("${influxdb.org}") String org) {
this.writeApi = influxDBClient.getWriteApiBlocking();
this.queryApi = influxDBClient.getQueryApi();
this.bucket = bucket;
this.org = org;
}
public void save(SensorData sensorData) {
Point point = Point.measurement("sensor_data")
.addTag("device_id", sensorData.getDeviceId())
.addTag("sensor_type", sensorData.getSensorType())
.addField("value", sensorData.getValue())
.addField("unit", sensorData.getUnit())
.time(sensorData.getTimestamp().toEpochMilli(), WritePrecision.MS);
if (sensorData.getLocation() != null) {
point.addTag("latitude", String.valueOf(sensorData.getLocation().getLatitude()))
.addTag("longitude", String.valueOf(sensorData.getLocation().getLongitude()));
}
writeApi.writePoint(bucket, org, point);
}
public void saveAll(List<SensorData> sensorDataList) {
List<Point> points = sensorDataList.stream()
.map(this::convertToPoint)
.toList();
writeApi.writePoints(bucket, org, points);
}
private Point convertToPoint(SensorData sensorData) {
Point point = Point.measurement("sensor_data")
.addTag("device_id", sensorData.getDeviceId())
.addTag("sensor_type", sensorData.getSensorType())
.addField("value", sensorData.getValue())
.addField("unit", sensorData.getUnit())
.time(sensorData.getTimestamp().toEpochMilli(), WritePrecision.MS);
if (sensorData.getLocation() != null) {
point.addTag("latitude", String.valueOf(sensorData.getLocation().getLatitude()))
.addTag("longitude", String.valueOf(sensorData.getLocation().getLongitude()));
}
return point;
}
public List<SensorData> queryData(String deviceId, String sensorType, 
Instant startTime, Instant endTime) {
String fluxQuery = String.format(
"from(bucket: \"%s\") " +
"|> range(start: %s, stop: %s) " +
"|> filter(fn: (r) => r._measurement == \"sensor_data\") " +
"|> filter(fn: (r) => r.device_id == \"%s\") " +
"|> filter(fn: (r) => r.sensor_type == \"%s\") " +
"|> filter(fn: (r) => r._field == \"value\")",
bucket, startTime, endTime, deviceId, sensorType);
List<SensorData> results = queryApi.query(fluxQuery, org, SensorData.class);
return results;
}
}
2. MongoDB Repository for Device Management
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.stereotype.Repository;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
@Repository
public interface DeviceRepository extends MongoRepository<IoTDevice, String> {
Optional<IoTDevice> findByDeviceId(String deviceId);
List<IoTDevice> findByDeviceType(String deviceType);
List<IoTDevice> findByStatus(DeviceStatus status);
List<IoTDevice> findByInstallationLocationLatitudeBetweenAndInstallationLocationLongitudeBetween(
double minLat, double maxLat, double minLon, double maxLon);
@Query("{ 'lastSeen': { $lt: ?0 } }")
List<IoTDevice> findDevicesOfflineSince(Instant timestamp);
@Query("{ 'sensorConfigs.?0.enabled': true }")
List<IoTDevice> findDevicesWithSensorType(String sensorType);
}

Analytics and Monitoring

1. Real-time Analytics Service
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class AnalyticsService {
private final Map<String, RollingWindow> rollingWindows;
private final Map<String, TrendAnalysis> trendAnalyses;
public AnalyticsService() {
this.rollingWindows = new ConcurrentHashMap<>();
this.trendAnalyses = new ConcurrentHashMap<>();
}
public void processRealTimeData(SensorData sensorData) {
String key = sensorData.getDeviceId() + ":" + sensorData.getSensorType();
// Update rolling window
RollingWindow window = rollingWindows.computeIfAbsent(key, 
k -> new RollingWindow(Duration.ofMinutes(10))); // 10-minute window
window.addDataPoint(sensorData);
// Perform trend analysis
performTrendAnalysis(sensorData, key);
// Detect patterns
detectPatterns(sensorData, key);
}
public void processBatchData(List<SensorData> batchData) {
// Aggregate analysis for batch data
Map<String, List<SensorData>> groupedData = new HashMap<>();
for (SensorData data : batchData) {
String key = data.getDeviceId() + ":" + data.getSensorType();
groupedData.computeIfAbsent(key, k -> new ArrayList<>()).add(data);
}
// Perform batch analytics for each group
groupedData.forEach(this::performBatchAnalysis);
}
private void performTrendAnalysis(SensorData sensorData, String key) {
TrendAnalysis trend = trendAnalyses.computeIfAbsent(key, 
k -> new TrendAnalysis(100)); // Last 100 points
trend.addDataPoint(sensorData);
// Check for significant trends
if (trend.hasEnoughData()) {
double slope = trend.calculateSlope();
if (Math.abs(slope) > 0.1) { // Significant trend threshold
System.out.printf("Significant trend detected for %s: slope=%.3f%n", key, slope);
// Could trigger alerts or adaptive sampling
if (slope > 0) {
System.out.println("Increasing trend detected");
} else {
System.out.println("Decreasing trend detected");
}
}
}
}
private void detectPatterns(SensorData sensorData, String key) {
// Implement pattern detection logic
// This could include:
// - Seasonal patterns
// - Spike detection
// - Correlation between different sensors
}
private void performBatchAnalysis(String key, List<SensorData> data) {
// Perform comprehensive analysis on batch data
DescriptiveStatistics stats = new DescriptiveStatistics();
data.forEach(d -> stats.addValue(d.getValue()));
System.out.printf("Batch Analysis for %s: count=%d, mean=%.2f, std=%.2f%n",
key, stats.getN(), stats.getMean(), stats.getStandardDeviation());
}
public RollingWindow getRollingWindow(String deviceId, String sensorType) {
String key = deviceId + ":" + sensorType;
return rollingWindows.get(key);
}
}
public class RollingWindow {
private final LinkedList<SensorData> dataPoints;
private final Duration windowDuration;
private final int maxPoints;
public RollingWindow(Duration windowDuration) {
this.dataPoints = new LinkedList<>();
this.windowDuration = windowDuration;
this.maxPoints = 1000; // Prevent memory issues
}
public void addDataPoint(SensorData dataPoint) {
dataPoints.addLast(dataPoint);
// Remove old data points
Instant cutoff = Instant.now().minus(windowDuration);
while (!dataPoints.isEmpty() && 
dataPoints.getFirst().getTimestamp().isBefore(cutoff)) {
dataPoints.removeFirst();
}
// Enforce maximum size
while (dataPoints.size() > maxPoints) {
dataPoints.removeFirst();
}
}
public List<SensorData> getDataPoints() {
return new ArrayList<>(dataPoints);
}
public int getSize() {
return dataPoints.size();
}
public Optional<Double> getAverage() {
if (dataPoints.isEmpty()) return Optional.empty();
double sum = dataPoints.stream()
.mapToDouble(SensorData::getValue)
.sum();
return Optional.of(sum / dataPoints.size());
}
}
public class TrendAnalysis {
private final LinkedList<SensorData> dataPoints;
private final int maxPoints;
public TrendAnalysis(int maxPoints) {
this.dataPoints = new LinkedList<>();
this.maxPoints = maxPoints;
}
public void addDataPoint(SensorData dataPoint) {
dataPoints.addLast(dataPoint);
if (dataPoints.size() > maxPoints) {
dataPoints.removeFirst();
}
}
public boolean hasEnoughData() {
return dataPoints.size() >= 10; // Minimum points for trend analysis
}
public double calculateSlope() {
if (dataPoints.size() < 2) return 0.0;
// Simple linear regression
int n = dataPoints.size();
double sumX = 0, sumY = 0, sumXY = 0, sumX2 = 0;
for (int i = 0; i < n; i++) {
double x = i; // Time index
double y = dataPoints.get(i).getValue();
sumX += x;
sumY += y;
sumXY += x * y;
sumX2 += x * x;
}
double slope = (n * sumXY - sumX * sumY) / (n * sumX2 - sumX * sumX);
return slope;
}
}
2. Alert Management System
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class AlertService {
private final Map<String, List<Alert>> deviceAlerts;
private final List<AlertListener> listeners;
public AlertService() {
this.deviceAlerts = new ConcurrentHashMap<>();
this.listeners = new ArrayList<>();
}
public void sendAlert(Alert alert) {
// Store alert
deviceAlerts.computeIfAbsent(alert.getDeviceId(), k -> new ArrayList<>())
.add(alert);
// Notify listeners
notifyListeners(alert);
// Log alert
System.out.println("ALERT: " + alert.getMessage());
}
public void addListener(AlertListener listener) {
listeners.add(listener);
}
private void notifyListeners(Alert alert) {
for (AlertListener listener : listeners) {
try {
listener.onAlert(alert);
} catch (Exception e) {
System.err.println("Error notifying alert listener: " + e.getMessage());
}
}
}
public List<Alert> getAlertsForDevice(String deviceId, Instant since) {
List<Alert> deviceAlertList = deviceAlerts.getOrDefault(deviceId, new ArrayList<>());
return deviceAlertList.stream()
.filter(alert -> alert.getTimestamp().isAfter(since))
.toList();
}
public void clearOldAlerts(Instant cutoff) {
for (List<Alert> alerts : deviceAlerts.values()) {
alerts.removeIf(alert -> alert.getTimestamp().isBefore(cutoff));
}
}
}
public class Alert {
private String deviceId;
private String sensorType;
private double value;
private Instant timestamp;
private AlertType type;
private String message;
private AlertSeverity severity;
// Getters and setters
public String getDeviceId() { return deviceId; }
public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
public String getSensorType() { return sensorType; }
public void setSensorType(String sensorType) { this.sensorType = sensorType; }
public double getValue() { return value; }
public void setValue(double value) { this.value = value; }
public Instant getTimestamp() { return timestamp; }
public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }
public AlertType getType() { return type; }
public void setType(AlertType type) { this.type = type; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public AlertSeverity getSeverity() { return severity; }
public void setSeverity(AlertSeverity severity) { this.severity = severity; }
}
public enum AlertType {
THRESHOLD_VIOLATION, ANOMALY, DEVICE_OFFLINE, DEVICE_ERROR, MAINTENANCE_REQUIRED
}
public enum AlertSeverity {
LOW, MEDIUM, HIGH, CRITICAL
}
public interface AlertListener {
void onAlert(Alert alert);
}

REST API Controllers

1. Sensor Data API
import org.springframework.web.bind.annotation.*;
import org.springframework.http.ResponseEntity;
import java.time.Instant;
import java.util.List;
@RestController
@RequestMapping("/api/sensors")
public class SensorDataController {
private final SensorDataProcessor dataProcessor;
private final SensorDataRepository dataRepository;
private final AnalyticsService analyticsService;
public SensorDataController(SensorDataProcessor dataProcessor,
SensorDataRepository dataRepository,
AnalyticsService analyticsService) {
this.dataProcessor = dataProcessor;
this.dataRepository = dataRepository;
this.analyticsService = analyticsService;
}
@PostMapping("/data")
public ResponseEntity<SensorData> receiveSensorData(@RequestBody SensorData sensorData) {
sensorData.setTimestamp(Instant.now()); // Ensure timestamp is set
dataProcessor.processSensorData(sensorData);
return ResponseEntity.ok(sensorData);
}
@PostMapping("/data/batch")
public ResponseEntity<String> receiveBatchSensorData(@RequestBody List<SensorData> sensorDataList) {
dataProcessor.processBatchSensorData(sensorDataList);
return ResponseEntity.ok("Processed " + sensorDataList.size() + " data points");
}
@GetMapping("/data/{deviceId}/{sensorType}")
public ResponseEntity<List<SensorData>> getSensorData(
@PathVariable String deviceId,
@PathVariable String sensorType,
@RequestParam Instant startTime,
@RequestParam Instant endTime) {
List<SensorData> data = dataRepository.queryData(deviceId, sensorType, startTime, endTime);
return ResponseEntity.ok(data);
}
@GetMapping("/statistics/{deviceId}/{sensorType}")
public ResponseEntity<SensorStatistics> getSensorStatistics(
@PathVariable String deviceId,
@PathVariable String sensorType) {
SensorStatistics stats = dataProcessor.getSensorStatistics(deviceId, sensorType);
return ResponseEntity.ok(stats);
}
@GetMapping("/analytics/{deviceId}/{sensorType}")
public ResponseEntity<RollingWindow> getAnalyticsData(
@PathVariable String deviceId,
@PathVariable String sensorType) {
RollingWindow window = analyticsService.getRollingWindow(deviceId, sensorType);
return ResponseEntity.ok(window);
}
}
2. Device Management API
@RestController
@RequestMapping("/api/devices")
public class DeviceController {
private final DeviceRepository deviceRepository;
private final AlertService alertService;
public DeviceController(DeviceRepository deviceRepository, AlertService alertService) {
this.deviceRepository = deviceRepository;
this.alertService = alertService;
}
@PostMapping
public ResponseEntity<IoTDevice> registerDevice(@RequestBody IoTDevice device) {
IoTDevice savedDevice = deviceRepository.save(device);
return ResponseEntity.ok(savedDevice);
}
@GetMapping("/{deviceId}")
public ResponseEntity<IoTDevice> getDevice(@PathVariable String deviceId) {
Optional<IoTDevice> device = deviceRepository.findByDeviceId(deviceId);
return device.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
@GetMapping("/{deviceId}/alerts")
public ResponseEntity<List<Alert>> getDeviceAlerts(
@PathVariable String deviceId,
@RequestParam(defaultValue = "1") int hours) {
Instant since = Instant.now().minusSeconds(hours * 3600);
List<Alert> alerts = alertService.getAlertsForDevice(deviceId, since);
return ResponseEntity.ok(alerts);
}
@GetMapping("/status/{status}")
public ResponseEntity<List<IoTDevice>> getDevicesByStatus(@PathVariable DeviceStatus status) {
List<IoTDevice> devices = deviceRepository.findByStatus(status);
return ResponseEntity.ok(devices);
}
@PutMapping("/{deviceId}/config")
public ResponseEntity<IoTDevice> updateDeviceConfig(
@PathVariable String deviceId,
@RequestBody Map<String, SensorConfig> sensorConfigs) {
Optional<IoTDevice> deviceOpt = deviceRepository.findByDeviceId(deviceId);
if (deviceOpt.isEmpty()) {
return ResponseEntity.notFound().build();
}
IoTDevice device = deviceOpt.get();
device.setSensorConfigs(sensorConfigs);
IoTDevice updatedDevice = deviceRepository.save(device);
return ResponseEntity.ok(updatedDevice);
}
}

WebSocket for Real-time Updates

import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
@Component
public class SensorWebSocketHandler extends TextWebSocketHandler implements AlertListener {
private final List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
private final ObjectMapper objectMapper;
public SensorWebSocketHandler(ObjectMapper objectMapper, AlertService alertService) {
this.objectMapper = objectMapper;
alertService.addListener(this);
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
System.out.println("WebSocket connection established: " + session.getId());
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
System.out.println("WebSocket connection closed: " + session.getId());
}
@Override
public void onAlert(Alert alert) {
// Broadcast alert to all connected clients
broadcastMessage("ALERT", alert);
}
public void broadcastSensorData(SensorData sensorData) {
broadcastMessage("SENSOR_DATA", sensorData);
}
private void broadcastMessage(String type, Object data) {
Map<String, Object> message = new HashMap<>();
message.put("type", type);
message.put("data", data);
message.put("timestamp", Instant.now());
String jsonMessage;
try {
jsonMessage = objectMapper.writeValueAsString(message);
} catch (IOException e) {
System.err.println("Error serializing WebSocket message: " + e.getMessage());
return;
}
for (WebSocketSession session : sessions) {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage(jsonMessage));
} catch (IOException e) {
System.err.println("Error sending WebSocket message: " + e.getMessage());
}
}
}
}
}

Configuration Classes

1. Application Configuration
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
@Configuration
public class AppConfig {
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
return mapper;
}
@Bean
public MqttSensorClient mqttSensorClient(ObjectMapper objectMapper,
SensorDataProcessor dataProcessor,
@Value("${mqtt.broker.url}") String brokerUrl) {
return new MqttSensorClient(objectMapper, dataProcessor, brokerUrl);
}
}
2. Spring Boot Application
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class IoTApplication {
public static void main(String[] args) {
SpringApplication.run(IoTApplication.class, args);
}
}

Usage Examples

1. Simulating Sensor Data
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.Random;
@Component
public class SensorDataSimulator {
private final SensorDataProcessor dataProcessor;
private final Random random = new Random();
public SensorDataSimulator(SensorDataProcessor dataProcessor) {
this.dataProcessor = dataProcessor;
}
@Scheduled(fixedRate = 5000) // Every 5 seconds
public void simulateTemperatureSensor() {
SensorData data = new SensorData();
data.setDeviceId("temp-sensor-001");
data.setSensorType("temperature");
data.setValue(20 + random.nextGaussian() * 5); // 20°C ± 5°C
data.setUnit("°C");
data.setTimestamp(Instant.now());
dataProcessor.processSensorData(data);
}
@Scheduled(fixedRate = 10000) // Every 10 seconds
public void simulateMultiSensorDevice() {
MultiSensorData multiData = new MultiSensorData();
multiData.setDeviceId("multi-sensor-001");
multiData.setTimestamp(Instant.now());
multiData.setStatus(DeviceStatus.ONLINE);
Map<String, SensorReading> readings = new HashMap<>();
// Temperature reading
SensorReading tempReading = new SensorReading();
tempReading.setValue(22.5 + random.nextGaussian() * 2);
tempReading.setUnit("°C");
readings.put("temperature", tempReading);
// Humidity reading
SensorReading humidityReading = new SensorReading();
humidityReading.setValue(45 + random.nextGaussian() * 10);
humidityReading.setUnit("%");
readings.put("humidity", humidityReading);
// Pressure reading
SensorReading pressureReading = new SensorReading();
pressureReading.setValue(1013 + random.nextGaussian() * 5);
pressureReading.setUnit("hPa");
readings.put("pressure", pressureReading);
multiData.setReadings(readings);
dataProcessor.processMultiSensorData(multiData);
}
}
2. Monitoring Dashboard
@RestController
@RequestMapping("/api/dashboard")
public class DashboardController {
private final SensorDataProcessor dataProcessor;
private final DeviceRepository deviceRepository;
private final AnalyticsService analyticsService;
public DashboardController(SensorDataProcessor dataProcessor,
DeviceRepository deviceRepository,
AnalyticsService analyticsService) {
this.dataProcessor = dataProcessor;
this.deviceRepository = deviceRepository;
this.analyticsService = analyticsService;
}
@GetMapping("/summary")
public ResponseEntity<DashboardSummary> getDashboardSummary() {
DashboardSummary summary = new DashboardSummary();
// Get device counts
long totalDevices = deviceRepository.count();
long onlineDevices = deviceRepository.findByStatus(DeviceStatus.ONLINE).size();
summary.setTotalDevices(totalDevices);
summary.setOnlineDevices(onlineDevices);
summary.setOfflineDevices(totalDevices - onlineDevices);
// Get recent alerts count
// This would typically query the alert service
// Get system status
summary.setSystemStatus("OPERATIONAL");
summary.setLastUpdated(Instant.now());
return ResponseEntity.ok(summary);
}
}
public class DashboardSummary {
private long totalDevices;
private long onlineDevices;
private long offlineDevices;
private int activeAlerts;
private String systemStatus;
private Instant lastUpdated;
// Getters and setters
public long getTotalDevices() { return totalDevices; }
public void setTotalDevices(long totalDevices) { this.totalDevices = totalDevices; }
public long getOnlineDevices() { return onlineDevices; }
public void setOnlineDevices(long onlineDevices) { this.onlineDevices = onlineDevices; }
public long getOfflineDevices() { return offlineDevices; }
public void setOfflineDevices(long offlineDevices) { this.offlineDevices = offlineDevices; }
public int getActiveAlerts() { return activeAlerts; }
public void setActiveAlerts(int activeAlerts) { this.activeAlerts = activeAlerts; }
public String getSystemStatus() { return systemStatus; }
public void setSystemStatus(String systemStatus) { this.systemStatus = systemStatus; }
public Instant getLastUpdated() { return lastUpdated; }
public void setLastUpdated(Instant lastUpdated) { this.lastUpdated = lastUpdated; }
}

Best Practices

  1. Scalability: Use message brokers for high-volume data
  2. Fault Tolerance: Implement retry mechanisms and circuit breakers
  3. Security: Encrypt sensitive data and use secure protocols
  4. Monitoring: Implement comprehensive logging and metrics
  5. Data Quality: Validate and clean sensor data
  6. Performance: Use efficient data structures and algorithms
// Example of circuit breaker pattern for external services
@Component
public class CircuitBreaker {
private final int failureThreshold;
private final long timeout;
private int failureCount = 0;
private long lastFailureTime = 0;
private State state = State.CLOSED;
enum State { CLOSED, OPEN, HALF_OPEN }
public CircuitBreaker(@Value("${circuitbreaker.failure-threshold}") int failureThreshold,
@Value("${circuitbreaker.timeout}") long timeout) {
this.failureThreshold = failureThreshold;
this.timeout = timeout;
}
public boolean allowRequest() {
if (state == State.OPEN) {
if (System.currentTimeMillis() - lastFailureTime > timeout) {
state = State.HALF_OPEN;
return true;
}
return false;
}
return true;
}
public void recordSuccess() {
failureCount = 0;
state = State.CLOSED;
}
public void recordFailure() {
failureCount++;
lastFailureTime = System.currentTimeMillis();
if (failureCount >= failureThreshold) {
state = State.OPEN;
}
}
}

Conclusion

This comprehensive IoT sensor data processing system provides:

  • Real-time data ingestion from multiple protocols (MQTT, Kafka, HTTP)
  • Advanced data processing with anomaly detection and trend analysis
  • Scalable storage using time-series and document databases
  • Real-time monitoring with WebSocket updates
  • Alert management for proactive issue detection
  • RESTful APIs for integration and management

The modular architecture allows for easy extension and adaptation to specific IoT use cases, from smart home applications to industrial monitoring systems.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper