IoT (Internet of Things) sensor data processing involves collecting, analyzing, and acting upon data from connected devices. This comprehensive guide covers the entire pipeline from data ingestion to real-time analytics.
IoT Architecture Overview
Key Components:
- Sensors/Devices: Data generators (temperature, humidity, motion, etc.)
- Data Ingestion: Message brokers and protocols
- Processing Engine: Real-time and batch processing
- Storage: Time-series databases and data lakes
- Analytics: Real-time monitoring and machine learning
- Visualization: Dashboards and alerts
Dependencies and Setup
Maven Dependencies
<properties>
<spring-boot.version>3.1.0</spring-boot.version>
<kafka.version>3.4.0</kafka.version>
<influxdb.version>2.7.0</influxdb.version>
<mosquitto.version>1.2.5</mosquitto.version>
<jackson.version>2.15.2</jackson.version>
</properties>
<dependencies>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- Message Brokers -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${mosquitto.version}</version>
</dependency>
<!-- Databases -->
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>${influxdb.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-boot.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Core Data Models
1. Sensor Data Models
import com.fasterxml.jackson.annotation.JsonFormat;
import java.time.Instant;
import java.util.Map;
public class SensorData {
private String deviceId;
private String sensorType;
private double value;
private String unit;
private Map<String, Object> metadata;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", timezone = "UTC")
private Instant timestamp;
private Location location;
// Constructors
public SensorData() {}
public SensorData(String deviceId, String sensorType, double value, String unit, Instant timestamp) {
this.deviceId = deviceId;
this.sensorType = sensorType;
this.value = value;
this.unit = unit;
this.timestamp = timestamp;
}
// Getters and setters
public String getDeviceId() { return deviceId; }
public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
public String getSensorType() { return sensorType; }
public void setSensorType(String sensorType) { this.sensorType = sensorType; }
public double getValue() { return value; }
public void setValue(double value) { this.value = value; }
public String getUnit() { return unit; }
public void setUnit(String unit) { this.unit = unit; }
public Map<String, Object> getMetadata() { return metadata; }
public void setMetadata(Map<String, Object> metadata) { this.metadata = metadata; }
public Instant getTimestamp() { return timestamp; }
public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }
public Location getLocation() { return location; }
public void setLocation(Location location) { this.location = location; }
@Override
public String toString() {
return String.format("SensorData{deviceId='%s', type='%s', value=%.2f, unit='%s', timestamp=%s}",
deviceId, sensorType, value, unit, timestamp);
}
}
public class Location {
private double latitude;
private double longitude;
private Double altitude;
public Location() {}
public Location(double latitude, double longitude) {
this.latitude = latitude;
this.longitude = longitude;
}
public Location(double latitude, double longitude, Double altitude) {
this.latitude = latitude;
this.longitude = longitude;
this.altitude = altitude;
}
// Getters and setters
public double getLatitude() { return latitude; }
public void setLatitude(double latitude) { this.latitude = latitude; }
public double getLongitude() { return longitude; }
public void setLongitude(double longitude) { this.longitude = longitude; }
public Double getAltitude() { return altitude; }
public void setAltitude(Double altitude) { this.altitude = altitude; }
}
public class MultiSensorData {
private String deviceId;
private Instant timestamp;
private Map<String, SensorReading> readings;
private DeviceStatus status;
private Location location;
// Getters and setters
public String getDeviceId() { return deviceId; }
public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
public Instant getTimestamp() { return timestamp; }
public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }
public Map<String, SensorReading> getReadings() { return readings; }
public void setReadings(Map<String, SensorReading> readings) { this.readings = readings; }
public DeviceStatus getStatus() { return status; }
public void setStatus(DeviceStatus status) { this.status = status; }
public Location getLocation() { return location; }
public void setLocation(Location location) { this.location = location; }
}
public class SensorReading {
private double value;
private String unit;
private Double minValue;
private Double maxValue;
private SensorQuality quality;
// Getters and setters
public double getValue() { return value; }
public void setValue(double value) { this.value = value; }
public String getUnit() { return unit; }
public void setUnit(String unit) { this.unit = unit; }
public Double getMinValue() { return minValue; }
public void setMinValue(Double minValue) { this.minValue = minValue; }
public Double getMaxValue() { return maxValue; }
public void setMaxValue(Double maxValue) { this.maxValue = maxValue; }
public SensorQuality getQuality() { return SensorQuality.GOOD; }
public void setQuality(SensorQuality quality) { this.quality = quality; }
}
public enum SensorQuality {
EXCELLENT, GOOD, FAIR, POOR, ERROR
}
public enum DeviceStatus {
ONLINE, OFFLINE, DEGRADED, MAINTENANCE
}
2. Device Management Models
public class IoTDevice {
private String deviceId;
private String deviceName;
private String deviceType;
private String manufacturer;
private String firmwareVersion;
private Location installationLocation;
private Map<String, String> specifications;
private DeviceStatus status;
private Instant lastSeen;
private Map<String, SensorConfig> sensorConfigs;
// Getters and setters
public String getDeviceId() { return deviceId; }
public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
public String getDeviceName() { return deviceName; }
public void setDeviceName(String deviceName) { this.deviceName = deviceName; }
public String getDeviceType() { return deviceType; }
public void setDeviceType(String deviceType) { this.deviceType = deviceType; }
public String getManufacturer() { return manufacturer; }
public void setManufacturer(String manufacturer) { this.manufacturer = manufacturer; }
public String getFirmwareVersion() { return firmwareVersion; }
public void setFirmwareVersion(String firmwareVersion) { this.firmwareVersion = firmwareVersion; }
public Location getInstallationLocation() { return installationLocation; }
public void setInstallationLocation(Location installationLocation) { this.installationLocation = installationLocation; }
public Map<String, String> getSpecifications() { return specifications; }
public void setSpecifications(Map<String, String> specifications) { this.specifications = specifications; }
public DeviceStatus getStatus() { return status; }
public void setStatus(DeviceStatus status) { this.status = status; }
public Instant getLastSeen() { return lastSeen; }
public void setLastSeen(Instant lastSeen) { this.lastSeen = lastSeen; }
public Map<String, SensorConfig> getSensorConfigs() { return sensorConfigs; }
public void setSensorConfigs(Map<String, SensorConfig> sensorConfigs) { this.sensorConfigs = sensorConfigs; }
}
public class SensorConfig {
private String sensorType;
private double samplingRate; // Hz
private Double minThreshold;
private Double maxThreshold;
private String unit;
private int retentionDays;
private boolean enabled;
// Getters and setters
public String getSensorType() { return sensorType; }
public void setSensorType(String sensorType) { this.sensorType = sensorType; }
public double getSamplingRate() { return samplingRate; }
public void setSamplingRate(double samplingRate) { this.samplingRate = samplingRate; }
public Double getMinThreshold() { return minThreshold; }
public void setMinThreshold(Double minThreshold) { this.minThreshold = minThreshold; }
public Double getMaxThreshold() { return maxThreshold; }
public void setMaxThreshold(Double maxThreshold) { this.maxThreshold = maxThreshold; }
public String getUnit() { return unit; }
public void setUnit(String unit) { this.unit = unit; }
public int getRetentionDays() { return retentionDays; }
public void setRetentionDays(int retentionDays) { this.retentionDays = retentionDays; }
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
}
Data Ingestion Layer
1. MQTT Client for Sensor Data
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
@Component
public class MqttSensorClient implements MqttCallback {
private MqttClient mqttClient;
private final ObjectMapper objectMapper;
private final SensorDataProcessor dataProcessor;
private final String brokerUrl;
private final String clientId;
public MqttSensorClient(ObjectMapper objectMapper,
SensorDataProcessor dataProcessor,
@Value("${mqtt.broker.url}") String brokerUrl) {
this.objectMapper = objectMapper;
this.dataProcessor = dataProcessor;
this.brokerUrl = brokerUrl;
this.clientId = "sensor-processor-" + System.currentTimeMillis();
}
public void connect() throws MqttException {
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
options.setKeepAliveInterval(60);
mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
mqttClient.setCallback(this);
mqttClient.connect(options);
// Subscribe to sensor topics
subscribeToTopics();
}
private void subscribeToTopics() throws MqttException {
String[] topics = {
"sensors/+/temperature",
"sensors/+/humidity",
"sensors/+/pressure",
"sensors/+/multi",
"devices/+/status"
};
int[] qos = {1, 1, 1, 1, 1}; // Quality of Service level 1
mqttClient.subscribe(topics, qos);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
try {
if (topic.contains("/multi")) {
// Handle multi-sensor data
MultiSensorData multiData = objectMapper.readValue(payload, MultiSensorData.class);
dataProcessor.processMultiSensorData(multiData);
} else if (topic.contains("/status")) {
// Handle device status
handleDeviceStatus(topic, payload);
} else {
// Handle single sensor data
SensorData sensorData = objectMapper.readValue(payload, SensorData.class);
dataProcessor.processSensorData(sensorData);
}
} catch (Exception e) {
System.err.println("Error processing MQTT message: " + e.getMessage());
// Could send to dead letter queue
}
}
private void handleDeviceStatus(String topic, String payload) throws Exception {
String deviceId = extractDeviceIdFromTopic(topic);
DeviceStatus status = objectMapper.readValue(payload, DeviceStatus.class);
dataProcessor.updateDeviceStatus(deviceId, status);
}
private String extractDeviceIdFromTopic(String topic) {
String[] parts = topic.split("/");
return parts[1]; // Assuming format: devices/{deviceId}/status
}
@Override
public void connectionLost(Throwable cause) {
System.err.println("MQTT Connection lost: " + cause.getMessage());
// Implement reconnection logic
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Not used for subscriptions
}
public void disconnect() throws MqttException {
if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.disconnect();
}
}
}
2. Kafka Consumer for High-Volume Data
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class KafkaSensorConsumer {
private final SensorDataProcessor dataProcessor;
private final ObjectMapper objectMapper;
public KafkaSensorConsumer(SensorDataProcessor dataProcessor, ObjectMapper objectMapper) {
this.dataProcessor = dataProcessor;
this.objectMapper = objectMapper;
}
@KafkaListener(topics = "${kafka.topics.sensor-data}", groupId = "${kafka.consumer.group}")
public void consumeSensorData(@Payload String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_KEY) String key) {
try {
SensorData sensorData = objectMapper.readValue(message, SensorData.class);
dataProcessor.processSensorData(sensorData);
} catch (Exception e) {
System.err.println("Error processing Kafka message: " + e.getMessage());
// Send to DLQ or error topic
}
}
@KafkaListener(topics = "${kafka.topics.batch-sensor-data}", groupId = "${kafka.consumer.group}")
public void consumeBatchSensorData(@Payload String message) {
try {
List<SensorData> batchData = objectMapper.readValue(message,
objectMapper.getTypeFactory().constructCollectionType(List.class, SensorData.class));
dataProcessor.processBatchSensorData(batchData);
} catch (Exception e) {
System.err.println("Error processing batch sensor data: " + e.getMessage());
}
}
}
Data Processing Engine
1. Core Sensor Data Processor
import org.springframework.stereotype.Component;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class SensorDataProcessor {
private final SensorDataRepository dataRepository;
private final AlertService alertService;
private final AnalyticsService analyticsService;
private final Map<String, SensorStatistics> sensorStatistics;
private final Map<String, Instant> deviceLastSeen;
public SensorDataProcessor(SensorDataRepository dataRepository,
AlertService alertService,
AnalyticsService analyticsService) {
this.dataRepository = dataRepository;
this.alertService = alertService;
this.analyticsService = analyticsService;
this.sensorStatistics = new ConcurrentHashMap<>();
this.deviceLastSeen = new ConcurrentHashMap<>();
}
public void processSensorData(SensorData sensorData) {
// Validate data
if (!validateSensorData(sensorData)) {
System.err.println("Invalid sensor data: " + sensorData);
return;
}
// Update device last seen
deviceLastSeen.put(sensorData.getDeviceId(), Instant.now());
// Check for anomalies
checkForAnomalies(sensorData);
// Update statistics
updateStatistics(sensorData);
// Store data
dataRepository.save(sensorData);
// Perform real-time analytics
analyticsService.processRealTimeData(sensorData);
}
public void processMultiSensorData(MultiSensorData multiData) {
// Update device status and location
updateDeviceStatus(multiData.getDeviceId(), multiData.getStatus());
// Process individual sensor readings
multiData.getReadings().forEach((sensorType, reading) -> {
SensorData sensorData = new SensorData();
sensorData.setDeviceId(multiData.getDeviceId());
sensorData.setSensorType(sensorType);
sensorData.setValue(reading.getValue());
sensorData.setUnit(reading.getUnit());
sensorData.setTimestamp(multiData.getTimestamp());
sensorData.setLocation(multiData.getLocation());
processSensorData(sensorData);
});
}
public void processBatchSensorData(List<SensorData> batchData) {
// Validate and filter data
List<SensorData> validData = batchData.stream()
.filter(this::validateSensorData)
.toList();
// Batch insert for efficiency
dataRepository.saveAll(validData);
// Batch analytics
analyticsService.processBatchData(validData);
}
private boolean validateSensorData(SensorData sensorData) {
if (sensorData.getDeviceId() == null || sensorData.getDeviceId().trim().isEmpty()) {
return false;
}
if (sensorData.getSensorType() == null || sensorData.getSensorType().trim().isEmpty()) {
return false;
}
if (sensorData.getTimestamp() == null) {
return false;
}
if (Double.isNaN(sensorData.getValue()) || Double.isInfinite(sensorData.getValue())) {
return false;
}
return true;
}
private void checkForAnomalies(SensorData sensorData) {
String sensorKey = sensorData.getDeviceId() + ":" + sensorData.getSensorType();
SensorStatistics stats = sensorStatistics.get(sensorKey);
if (stats != null && stats.getDataCount() > 10) {
double zScore = Math.abs((sensorData.getValue() - stats.getMean()) / stats.getStandardDeviation());
if (zScore > 3.0) { // 3 standard deviations
Alert alert = new Alert();
alert.setDeviceId(sensorData.getDeviceId());
alert.setSensorType(sensorData.getSensorType());
alert.setValue(sensorData.getValue());
alert.setTimestamp(sensorData.getTimestamp());
alert.setType(AlertType.ANOMALY);
alert.setMessage(String.format("Anomalous reading detected: %.2f (Z-score: %.2f)",
sensorData.getValue(), zScore));
alertService.sendAlert(alert);
}
}
// Check threshold violations
checkThresholdViolations(sensorData);
}
private void checkThresholdViolations(SensorData sensorData) {
// This would typically check against configured thresholds for the device/sensor
// For example:
if (sensorData.getSensorType().equals("temperature")) {
if (sensorData.getValue() > 40.0) { // High temperature threshold
Alert alert = new Alert();
alert.setDeviceId(sensorData.getDeviceId());
alert.setSensorType(sensorData.getSensorType());
alert.setValue(sensorData.getValue());
alert.setTimestamp(sensorData.getTimestamp());
alert.setType(AlertType.THRESHOLD_VIOLATION);
alert.setMessage(String.format("High temperature alert: %.2f°C", sensorData.getValue()));
alertService.sendAlert(alert);
}
}
}
private void updateStatistics(SensorData sensorData) {
String sensorKey = sensorData.getDeviceId() + ":" + sensorData.getSensorType();
SensorStatistics stats = sensorStatistics.computeIfAbsent(sensorKey,
k -> new SensorStatistics(1000)); // Keep last 1000 readings
stats.addValue(sensorData.getValue());
}
public void updateDeviceStatus(String deviceId, DeviceStatus status) {
// Update device status in database
// This could also trigger alerts for device offline/online status changes
}
public SensorStatistics getSensorStatistics(String deviceId, String sensorType) {
String sensorKey = deviceId + ":" + sensorType;
return sensorStatistics.get(sensorKey);
}
public Map<String, SensorStatistics> getAllStatistics() {
return Collections.unmodifiableMap(sensorStatistics);
}
}
2. Sensor Statistics Calculator
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import java.util.concurrent.locks.ReentrantLock;
public class SensorStatistics {
private final DescriptiveStatistics stats;
private final ReentrantLock lock;
private final int windowSize;
public SensorStatistics(int windowSize) {
this.stats = new DescriptiveStatistics(windowSize);
this.lock = new ReentrantLock();
this.windowSize = windowSize;
}
public void addValue(double value) {
lock.lock();
try {
stats.addValue(value);
} finally {
lock.unlock();
}
}
public double getMean() {
lock.lock();
try {
return stats.getMean();
} finally {
lock.unlock();
}
}
public double getStandardDeviation() {
lock.lock();
try {
return stats.getStandardDeviation();
} finally {
lock.unlock();
}
}
public double getMin() {
lock.lock();
try {
return stats.getMin();
} finally {
lock.unlock();
}
}
public double getMax() {
lock.lock();
try {
return stats.getMax();
} finally {
lock.unlock();
}
}
public double getPercentile(double percentile) {
lock.lock();
try {
return stats.getPercentile(percentile);
} finally {
lock.unlock();
}
}
public long getDataCount() {
lock.lock();
try {
return stats.getN();
} finally {
lock.unlock();
}
}
public void clear() {
lock.lock();
try {
stats.clear();
} finally {
lock.unlock();
}
}
}
Data Storage Layer
1. Time-Series Database (InfluxDB)
import com.influxdb.client.*;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import org.springframework.stereotype.Repository;
import java.time.Instant;
import java.util.List;
@Repository
public class InfluxDbSensorRepository {
private final WriteApiBlocking writeApi;
private final QueryApi queryApi;
private final String bucket;
private final String org;
public InfluxDbSensorRepository(InfluxDBClient influxDBClient,
@Value("${influxdb.bucket}") String bucket,
@Value("${influxdb.org}") String org) {
this.writeApi = influxDBClient.getWriteApiBlocking();
this.queryApi = influxDBClient.getQueryApi();
this.bucket = bucket;
this.org = org;
}
public void save(SensorData sensorData) {
Point point = Point.measurement("sensor_data")
.addTag("device_id", sensorData.getDeviceId())
.addTag("sensor_type", sensorData.getSensorType())
.addField("value", sensorData.getValue())
.addField("unit", sensorData.getUnit())
.time(sensorData.getTimestamp().toEpochMilli(), WritePrecision.MS);
if (sensorData.getLocation() != null) {
point.addTag("latitude", String.valueOf(sensorData.getLocation().getLatitude()))
.addTag("longitude", String.valueOf(sensorData.getLocation().getLongitude()));
}
writeApi.writePoint(bucket, org, point);
}
public void saveAll(List<SensorData> sensorDataList) {
List<Point> points = sensorDataList.stream()
.map(this::convertToPoint)
.toList();
writeApi.writePoints(bucket, org, points);
}
private Point convertToPoint(SensorData sensorData) {
Point point = Point.measurement("sensor_data")
.addTag("device_id", sensorData.getDeviceId())
.addTag("sensor_type", sensorData.getSensorType())
.addField("value", sensorData.getValue())
.addField("unit", sensorData.getUnit())
.time(sensorData.getTimestamp().toEpochMilli(), WritePrecision.MS);
if (sensorData.getLocation() != null) {
point.addTag("latitude", String.valueOf(sensorData.getLocation().getLatitude()))
.addTag("longitude", String.valueOf(sensorData.getLocation().getLongitude()));
}
return point;
}
public List<SensorData> queryData(String deviceId, String sensorType,
Instant startTime, Instant endTime) {
String fluxQuery = String.format(
"from(bucket: \"%s\") " +
"|> range(start: %s, stop: %s) " +
"|> filter(fn: (r) => r._measurement == \"sensor_data\") " +
"|> filter(fn: (r) => r.device_id == \"%s\") " +
"|> filter(fn: (r) => r.sensor_type == \"%s\") " +
"|> filter(fn: (r) => r._field == \"value\")",
bucket, startTime, endTime, deviceId, sensorType);
List<SensorData> results = queryApi.query(fluxQuery, org, SensorData.class);
return results;
}
}
2. MongoDB Repository for Device Management
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.stereotype.Repository;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
@Repository
public interface DeviceRepository extends MongoRepository<IoTDevice, String> {
Optional<IoTDevice> findByDeviceId(String deviceId);
List<IoTDevice> findByDeviceType(String deviceType);
List<IoTDevice> findByStatus(DeviceStatus status);
List<IoTDevice> findByInstallationLocationLatitudeBetweenAndInstallationLocationLongitudeBetween(
double minLat, double maxLat, double minLon, double maxLon);
@Query("{ 'lastSeen': { $lt: ?0 } }")
List<IoTDevice> findDevicesOfflineSince(Instant timestamp);
@Query("{ 'sensorConfigs.?0.enabled': true }")
List<IoTDevice> findDevicesWithSensorType(String sensorType);
}
Analytics and Monitoring
1. Real-time Analytics Service
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class AnalyticsService {
private final Map<String, RollingWindow> rollingWindows;
private final Map<String, TrendAnalysis> trendAnalyses;
public AnalyticsService() {
this.rollingWindows = new ConcurrentHashMap<>();
this.trendAnalyses = new ConcurrentHashMap<>();
}
public void processRealTimeData(SensorData sensorData) {
String key = sensorData.getDeviceId() + ":" + sensorData.getSensorType();
// Update rolling window
RollingWindow window = rollingWindows.computeIfAbsent(key,
k -> new RollingWindow(Duration.ofMinutes(10))); // 10-minute window
window.addDataPoint(sensorData);
// Perform trend analysis
performTrendAnalysis(sensorData, key);
// Detect patterns
detectPatterns(sensorData, key);
}
public void processBatchData(List<SensorData> batchData) {
// Aggregate analysis for batch data
Map<String, List<SensorData>> groupedData = new HashMap<>();
for (SensorData data : batchData) {
String key = data.getDeviceId() + ":" + data.getSensorType();
groupedData.computeIfAbsent(key, k -> new ArrayList<>()).add(data);
}
// Perform batch analytics for each group
groupedData.forEach(this::performBatchAnalysis);
}
private void performTrendAnalysis(SensorData sensorData, String key) {
TrendAnalysis trend = trendAnalyses.computeIfAbsent(key,
k -> new TrendAnalysis(100)); // Last 100 points
trend.addDataPoint(sensorData);
// Check for significant trends
if (trend.hasEnoughData()) {
double slope = trend.calculateSlope();
if (Math.abs(slope) > 0.1) { // Significant trend threshold
System.out.printf("Significant trend detected for %s: slope=%.3f%n", key, slope);
// Could trigger alerts or adaptive sampling
if (slope > 0) {
System.out.println("Increasing trend detected");
} else {
System.out.println("Decreasing trend detected");
}
}
}
}
private void detectPatterns(SensorData sensorData, String key) {
// Implement pattern detection logic
// This could include:
// - Seasonal patterns
// - Spike detection
// - Correlation between different sensors
}
private void performBatchAnalysis(String key, List<SensorData> data) {
// Perform comprehensive analysis on batch data
DescriptiveStatistics stats = new DescriptiveStatistics();
data.forEach(d -> stats.addValue(d.getValue()));
System.out.printf("Batch Analysis for %s: count=%d, mean=%.2f, std=%.2f%n",
key, stats.getN(), stats.getMean(), stats.getStandardDeviation());
}
public RollingWindow getRollingWindow(String deviceId, String sensorType) {
String key = deviceId + ":" + sensorType;
return rollingWindows.get(key);
}
}
public class RollingWindow {
private final LinkedList<SensorData> dataPoints;
private final Duration windowDuration;
private final int maxPoints;
public RollingWindow(Duration windowDuration) {
this.dataPoints = new LinkedList<>();
this.windowDuration = windowDuration;
this.maxPoints = 1000; // Prevent memory issues
}
public void addDataPoint(SensorData dataPoint) {
dataPoints.addLast(dataPoint);
// Remove old data points
Instant cutoff = Instant.now().minus(windowDuration);
while (!dataPoints.isEmpty() &&
dataPoints.getFirst().getTimestamp().isBefore(cutoff)) {
dataPoints.removeFirst();
}
// Enforce maximum size
while (dataPoints.size() > maxPoints) {
dataPoints.removeFirst();
}
}
public List<SensorData> getDataPoints() {
return new ArrayList<>(dataPoints);
}
public int getSize() {
return dataPoints.size();
}
public Optional<Double> getAverage() {
if (dataPoints.isEmpty()) return Optional.empty();
double sum = dataPoints.stream()
.mapToDouble(SensorData::getValue)
.sum();
return Optional.of(sum / dataPoints.size());
}
}
public class TrendAnalysis {
private final LinkedList<SensorData> dataPoints;
private final int maxPoints;
public TrendAnalysis(int maxPoints) {
this.dataPoints = new LinkedList<>();
this.maxPoints = maxPoints;
}
public void addDataPoint(SensorData dataPoint) {
dataPoints.addLast(dataPoint);
if (dataPoints.size() > maxPoints) {
dataPoints.removeFirst();
}
}
public boolean hasEnoughData() {
return dataPoints.size() >= 10; // Minimum points for trend analysis
}
public double calculateSlope() {
if (dataPoints.size() < 2) return 0.0;
// Simple linear regression
int n = dataPoints.size();
double sumX = 0, sumY = 0, sumXY = 0, sumX2 = 0;
for (int i = 0; i < n; i++) {
double x = i; // Time index
double y = dataPoints.get(i).getValue();
sumX += x;
sumY += y;
sumXY += x * y;
sumX2 += x * x;
}
double slope = (n * sumXY - sumX * sumY) / (n * sumX2 - sumX * sumX);
return slope;
}
}
2. Alert Management System
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class AlertService {
private final Map<String, List<Alert>> deviceAlerts;
private final List<AlertListener> listeners;
public AlertService() {
this.deviceAlerts = new ConcurrentHashMap<>();
this.listeners = new ArrayList<>();
}
public void sendAlert(Alert alert) {
// Store alert
deviceAlerts.computeIfAbsent(alert.getDeviceId(), k -> new ArrayList<>())
.add(alert);
// Notify listeners
notifyListeners(alert);
// Log alert
System.out.println("ALERT: " + alert.getMessage());
}
public void addListener(AlertListener listener) {
listeners.add(listener);
}
private void notifyListeners(Alert alert) {
for (AlertListener listener : listeners) {
try {
listener.onAlert(alert);
} catch (Exception e) {
System.err.println("Error notifying alert listener: " + e.getMessage());
}
}
}
public List<Alert> getAlertsForDevice(String deviceId, Instant since) {
List<Alert> deviceAlertList = deviceAlerts.getOrDefault(deviceId, new ArrayList<>());
return deviceAlertList.stream()
.filter(alert -> alert.getTimestamp().isAfter(since))
.toList();
}
public void clearOldAlerts(Instant cutoff) {
for (List<Alert> alerts : deviceAlerts.values()) {
alerts.removeIf(alert -> alert.getTimestamp().isBefore(cutoff));
}
}
}
public class Alert {
private String deviceId;
private String sensorType;
private double value;
private Instant timestamp;
private AlertType type;
private String message;
private AlertSeverity severity;
// Getters and setters
public String getDeviceId() { return deviceId; }
public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
public String getSensorType() { return sensorType; }
public void setSensorType(String sensorType) { this.sensorType = sensorType; }
public double getValue() { return value; }
public void setValue(double value) { this.value = value; }
public Instant getTimestamp() { return timestamp; }
public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }
public AlertType getType() { return type; }
public void setType(AlertType type) { this.type = type; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public AlertSeverity getSeverity() { return severity; }
public void setSeverity(AlertSeverity severity) { this.severity = severity; }
}
public enum AlertType {
THRESHOLD_VIOLATION, ANOMALY, DEVICE_OFFLINE, DEVICE_ERROR, MAINTENANCE_REQUIRED
}
public enum AlertSeverity {
LOW, MEDIUM, HIGH, CRITICAL
}
public interface AlertListener {
void onAlert(Alert alert);
}
REST API Controllers
1. Sensor Data API
import org.springframework.web.bind.annotation.*;
import org.springframework.http.ResponseEntity;
import java.time.Instant;
import java.util.List;
@RestController
@RequestMapping("/api/sensors")
public class SensorDataController {
private final SensorDataProcessor dataProcessor;
private final SensorDataRepository dataRepository;
private final AnalyticsService analyticsService;
public SensorDataController(SensorDataProcessor dataProcessor,
SensorDataRepository dataRepository,
AnalyticsService analyticsService) {
this.dataProcessor = dataProcessor;
this.dataRepository = dataRepository;
this.analyticsService = analyticsService;
}
@PostMapping("/data")
public ResponseEntity<SensorData> receiveSensorData(@RequestBody SensorData sensorData) {
sensorData.setTimestamp(Instant.now()); // Ensure timestamp is set
dataProcessor.processSensorData(sensorData);
return ResponseEntity.ok(sensorData);
}
@PostMapping("/data/batch")
public ResponseEntity<String> receiveBatchSensorData(@RequestBody List<SensorData> sensorDataList) {
dataProcessor.processBatchSensorData(sensorDataList);
return ResponseEntity.ok("Processed " + sensorDataList.size() + " data points");
}
@GetMapping("/data/{deviceId}/{sensorType}")
public ResponseEntity<List<SensorData>> getSensorData(
@PathVariable String deviceId,
@PathVariable String sensorType,
@RequestParam Instant startTime,
@RequestParam Instant endTime) {
List<SensorData> data = dataRepository.queryData(deviceId, sensorType, startTime, endTime);
return ResponseEntity.ok(data);
}
@GetMapping("/statistics/{deviceId}/{sensorType}")
public ResponseEntity<SensorStatistics> getSensorStatistics(
@PathVariable String deviceId,
@PathVariable String sensorType) {
SensorStatistics stats = dataProcessor.getSensorStatistics(deviceId, sensorType);
return ResponseEntity.ok(stats);
}
@GetMapping("/analytics/{deviceId}/{sensorType}")
public ResponseEntity<RollingWindow> getAnalyticsData(
@PathVariable String deviceId,
@PathVariable String sensorType) {
RollingWindow window = analyticsService.getRollingWindow(deviceId, sensorType);
return ResponseEntity.ok(window);
}
}
2. Device Management API
@RestController
@RequestMapping("/api/devices")
public class DeviceController {
private final DeviceRepository deviceRepository;
private final AlertService alertService;
public DeviceController(DeviceRepository deviceRepository, AlertService alertService) {
this.deviceRepository = deviceRepository;
this.alertService = alertService;
}
@PostMapping
public ResponseEntity<IoTDevice> registerDevice(@RequestBody IoTDevice device) {
IoTDevice savedDevice = deviceRepository.save(device);
return ResponseEntity.ok(savedDevice);
}
@GetMapping("/{deviceId}")
public ResponseEntity<IoTDevice> getDevice(@PathVariable String deviceId) {
Optional<IoTDevice> device = deviceRepository.findByDeviceId(deviceId);
return device.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
@GetMapping("/{deviceId}/alerts")
public ResponseEntity<List<Alert>> getDeviceAlerts(
@PathVariable String deviceId,
@RequestParam(defaultValue = "1") int hours) {
Instant since = Instant.now().minusSeconds(hours * 3600);
List<Alert> alerts = alertService.getAlertsForDevice(deviceId, since);
return ResponseEntity.ok(alerts);
}
@GetMapping("/status/{status}")
public ResponseEntity<List<IoTDevice>> getDevicesByStatus(@PathVariable DeviceStatus status) {
List<IoTDevice> devices = deviceRepository.findByStatus(status);
return ResponseEntity.ok(devices);
}
@PutMapping("/{deviceId}/config")
public ResponseEntity<IoTDevice> updateDeviceConfig(
@PathVariable String deviceId,
@RequestBody Map<String, SensorConfig> sensorConfigs) {
Optional<IoTDevice> deviceOpt = deviceRepository.findByDeviceId(deviceId);
if (deviceOpt.isEmpty()) {
return ResponseEntity.notFound().build();
}
IoTDevice device = deviceOpt.get();
device.setSensorConfigs(sensorConfigs);
IoTDevice updatedDevice = deviceRepository.save(device);
return ResponseEntity.ok(updatedDevice);
}
}
WebSocket for Real-time Updates
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
@Component
public class SensorWebSocketHandler extends TextWebSocketHandler implements AlertListener {
private final List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
private final ObjectMapper objectMapper;
public SensorWebSocketHandler(ObjectMapper objectMapper, AlertService alertService) {
this.objectMapper = objectMapper;
alertService.addListener(this);
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
System.out.println("WebSocket connection established: " + session.getId());
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
System.out.println("WebSocket connection closed: " + session.getId());
}
@Override
public void onAlert(Alert alert) {
// Broadcast alert to all connected clients
broadcastMessage("ALERT", alert);
}
public void broadcastSensorData(SensorData sensorData) {
broadcastMessage("SENSOR_DATA", sensorData);
}
private void broadcastMessage(String type, Object data) {
Map<String, Object> message = new HashMap<>();
message.put("type", type);
message.put("data", data);
message.put("timestamp", Instant.now());
String jsonMessage;
try {
jsonMessage = objectMapper.writeValueAsString(message);
} catch (IOException e) {
System.err.println("Error serializing WebSocket message: " + e.getMessage());
return;
}
for (WebSocketSession session : sessions) {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage(jsonMessage));
} catch (IOException e) {
System.err.println("Error sending WebSocket message: " + e.getMessage());
}
}
}
}
}
Configuration Classes
1. Application Configuration
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
@Configuration
public class AppConfig {
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
return mapper;
}
@Bean
public MqttSensorClient mqttSensorClient(ObjectMapper objectMapper,
SensorDataProcessor dataProcessor,
@Value("${mqtt.broker.url}") String brokerUrl) {
return new MqttSensorClient(objectMapper, dataProcessor, brokerUrl);
}
}
2. Spring Boot Application
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class IoTApplication {
public static void main(String[] args) {
SpringApplication.run(IoTApplication.class, args);
}
}
Usage Examples
1. Simulating Sensor Data
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.Random;
@Component
public class SensorDataSimulator {
private final SensorDataProcessor dataProcessor;
private final Random random = new Random();
public SensorDataSimulator(SensorDataProcessor dataProcessor) {
this.dataProcessor = dataProcessor;
}
@Scheduled(fixedRate = 5000) // Every 5 seconds
public void simulateTemperatureSensor() {
SensorData data = new SensorData();
data.setDeviceId("temp-sensor-001");
data.setSensorType("temperature");
data.setValue(20 + random.nextGaussian() * 5); // 20°C ± 5°C
data.setUnit("°C");
data.setTimestamp(Instant.now());
dataProcessor.processSensorData(data);
}
@Scheduled(fixedRate = 10000) // Every 10 seconds
public void simulateMultiSensorDevice() {
MultiSensorData multiData = new MultiSensorData();
multiData.setDeviceId("multi-sensor-001");
multiData.setTimestamp(Instant.now());
multiData.setStatus(DeviceStatus.ONLINE);
Map<String, SensorReading> readings = new HashMap<>();
// Temperature reading
SensorReading tempReading = new SensorReading();
tempReading.setValue(22.5 + random.nextGaussian() * 2);
tempReading.setUnit("°C");
readings.put("temperature", tempReading);
// Humidity reading
SensorReading humidityReading = new SensorReading();
humidityReading.setValue(45 + random.nextGaussian() * 10);
humidityReading.setUnit("%");
readings.put("humidity", humidityReading);
// Pressure reading
SensorReading pressureReading = new SensorReading();
pressureReading.setValue(1013 + random.nextGaussian() * 5);
pressureReading.setUnit("hPa");
readings.put("pressure", pressureReading);
multiData.setReadings(readings);
dataProcessor.processMultiSensorData(multiData);
}
}
2. Monitoring Dashboard
@RestController
@RequestMapping("/api/dashboard")
public class DashboardController {
private final SensorDataProcessor dataProcessor;
private final DeviceRepository deviceRepository;
private final AnalyticsService analyticsService;
public DashboardController(SensorDataProcessor dataProcessor,
DeviceRepository deviceRepository,
AnalyticsService analyticsService) {
this.dataProcessor = dataProcessor;
this.deviceRepository = deviceRepository;
this.analyticsService = analyticsService;
}
@GetMapping("/summary")
public ResponseEntity<DashboardSummary> getDashboardSummary() {
DashboardSummary summary = new DashboardSummary();
// Get device counts
long totalDevices = deviceRepository.count();
long onlineDevices = deviceRepository.findByStatus(DeviceStatus.ONLINE).size();
summary.setTotalDevices(totalDevices);
summary.setOnlineDevices(onlineDevices);
summary.setOfflineDevices(totalDevices - onlineDevices);
// Get recent alerts count
// This would typically query the alert service
// Get system status
summary.setSystemStatus("OPERATIONAL");
summary.setLastUpdated(Instant.now());
return ResponseEntity.ok(summary);
}
}
public class DashboardSummary {
private long totalDevices;
private long onlineDevices;
private long offlineDevices;
private int activeAlerts;
private String systemStatus;
private Instant lastUpdated;
// Getters and setters
public long getTotalDevices() { return totalDevices; }
public void setTotalDevices(long totalDevices) { this.totalDevices = totalDevices; }
public long getOnlineDevices() { return onlineDevices; }
public void setOnlineDevices(long onlineDevices) { this.onlineDevices = onlineDevices; }
public long getOfflineDevices() { return offlineDevices; }
public void setOfflineDevices(long offlineDevices) { this.offlineDevices = offlineDevices; }
public int getActiveAlerts() { return activeAlerts; }
public void setActiveAlerts(int activeAlerts) { this.activeAlerts = activeAlerts; }
public String getSystemStatus() { return systemStatus; }
public void setSystemStatus(String systemStatus) { this.systemStatus = systemStatus; }
public Instant getLastUpdated() { return lastUpdated; }
public void setLastUpdated(Instant lastUpdated) { this.lastUpdated = lastUpdated; }
}
Best Practices
- Scalability: Use message brokers for high-volume data
- Fault Tolerance: Implement retry mechanisms and circuit breakers
- Security: Encrypt sensitive data and use secure protocols
- Monitoring: Implement comprehensive logging and metrics
- Data Quality: Validate and clean sensor data
- Performance: Use efficient data structures and algorithms
// Example of circuit breaker pattern for external services
@Component
public class CircuitBreaker {
private final int failureThreshold;
private final long timeout;
private int failureCount = 0;
private long lastFailureTime = 0;
private State state = State.CLOSED;
enum State { CLOSED, OPEN, HALF_OPEN }
public CircuitBreaker(@Value("${circuitbreaker.failure-threshold}") int failureThreshold,
@Value("${circuitbreaker.timeout}") long timeout) {
this.failureThreshold = failureThreshold;
this.timeout = timeout;
}
public boolean allowRequest() {
if (state == State.OPEN) {
if (System.currentTimeMillis() - lastFailureTime > timeout) {
state = State.HALF_OPEN;
return true;
}
return false;
}
return true;
}
public void recordSuccess() {
failureCount = 0;
state = State.CLOSED;
}
public void recordFailure() {
failureCount++;
lastFailureTime = System.currentTimeMillis();
if (failureCount >= failureThreshold) {
state = State.OPEN;
}
}
}
Conclusion
This comprehensive IoT sensor data processing system provides:
- Real-time data ingestion from multiple protocols (MQTT, Kafka, HTTP)
- Advanced data processing with anomaly detection and trend analysis
- Scalable storage using time-series and document databases
- Real-time monitoring with WebSocket updates
- Alert management for proactive issue detection
- RESTful APIs for integration and management
The modular architecture allows for easy extension and adaptation to specific IoT use cases, from smart home applications to industrial monitoring systems.