Overview
MQTT (Message Queuing Telemetry Transport) is a lightweight publish-subscribe messaging protocol ideal for IoT applications. Eclipse Paho provides open-source client implementations for MQTT.
Setup and Dependencies
Maven Dependencies
<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency> <!-- For MQTT v5 --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.mqttv5.client</artifactId> <version>1.2.5</version> </dependency> </dependencies>
Basic MQTT Client Implementation
1. Simple MQTT Client
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class SimpleMqttClient {
private static final String BROKER_URL = "tcp://localhost:1883";
private static final String CLIENT_ID = "JavaMQTTClient";
private static final int QOS = 1;
private IMqttClient client;
private MemoryPersistence persistence;
public SimpleMqttClient() throws MqttException {
this.persistence = new MemoryPersistence();
this.client = new MqttClient(BROKER_URL, CLIENT_ID, persistence);
}
public void connect() throws MqttException {
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
options.setKeepAliveInterval(30);
System.out.println("Connecting to broker: " + BROKER_URL);
client.connect(options);
System.out.println("Connected successfully!");
}
public void disconnect() throws MqttException {
client.disconnect();
System.out.println("Disconnected from broker");
}
public void subscribe(String topic) throws MqttException {
client.subscribe(topic, QOS, this::handleMessage);
System.out.println("Subscribed to topic: " + topic);
}
public void unsubscribe(String topic) throws MqttException {
client.unsubscribe(topic);
System.out.println("Unsubscribed from topic: " + topic);
}
public void publish(String topic, String message) throws MqttException {
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(QOS);
mqttMessage.setRetained(false);
client.publish(topic, mqttMessage);
System.out.println("Message published to " + topic + ": " + message);
}
private void handleMessage(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
System.out.println("Received message from " + topic + ": " + payload);
}
public static void main(String[] args) {
try {
SimpleMqttClient mqttClient = new SimpleMqttClient();
mqttClient.connect();
// Subscribe to a topic
mqttClient.subscribe("test/topic");
// Publish a message
mqttClient.publish("test/topic", "Hello MQTT from Java!");
// Wait for messages
Thread.sleep(5000);
mqttClient.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2. Advanced MQTT Client with Callbacks
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class AdvancedMqttClient implements MqttCallback {
private IMqttClient client;
private String brokerUrl;
private String clientId;
private CountDownLatch connectionLatch;
private boolean isConnected = false;
public AdvancedMqttClient(String brokerUrl, String clientId) {
this.brokerUrl = brokerUrl;
this.clientId = clientId;
this.connectionLatch = new CountDownLatch(1);
}
public void connect() throws MqttException {
try {
this.client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
client.setCallback(this);
MqttConnectOptions options = createConnectionOptions();
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(options);
// Wait for connection acknowledgement
if (connectionLatch.await(10, TimeUnit.SECONDS)) {
System.out.println("Connected successfully!");
} else {
throw new MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MqttException(e);
}
}
private MqttConnectOptions createConnectionOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(30);
options.setKeepAliveInterval(60);
options.setMaxInflight(100);
// Optional: Set last will and testament
options.setWill("client/status", "offline".getBytes(), 1, true);
return options;
}
public void connectWithAuth(String username, String password) throws MqttException {
MqttConnectOptions options = createConnectionOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
client.connect(options);
}
public void connectWithSSL(String username, String password, String caFilePath)
throws MqttException {
MqttConnectOptions options = createConnectionOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
// SSL configuration would go here
// options.setSocketFactory(sslContext.getSocketFactory());
client.connect(options);
}
public void subscribe(String topic, int qos) throws MqttException {
client.subscribe(topic, qos);
System.out.println("Subscribed to topic: " + topic + " with QoS: " + qos);
}
public void subscribe(String[] topics, int[] qos) throws MqttException {
client.subscribe(topics, qos);
System.out.println("Subscribed to multiple topics");
}
public void publish(String topic, String message, int qos, boolean retained)
throws MqttException {
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
client.publish(topic, mqttMessage);
System.out.println("Published to " + topic + " [QoS:" + qos + "]: " + message);
}
public void publish(String topic, byte[] payload, int qos, boolean retained)
throws MqttException {
MqttMessage mqttMessage = new MqttMessage(payload);
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
client.publish(topic, mqttMessage);
}
// MQTT Callback methods
@Override
public void connectionLost(Throwable cause) {
System.err.println("Connection lost: " + cause.getMessage());
isConnected = false;
// Implement reconnection logic
try {
System.out.println("Attempting to reconnect...");
client.reconnect();
} catch (MqttException e) {
System.err.println("Reconnection failed: " + e.getMessage());
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
System.out.printf("Message arrived [%s]: %s (QoS: %d, Retained: %b)%n",
topic, payload, message.getQos(), message.isRetained());
// Process message based on topic
processMessage(topic, payload, message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try {
System.out.println("Message delivery complete: " + token.getMessageId());
} catch (MqttException e) {
System.err.println("Error getting message ID: " + e.getMessage());
}
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("Connection complete. Reconnected: " + reconnect);
isConnected = true;
connectionLatch.countDown();
}
private void processMessage(String topic, String payload, MqttMessage message) {
// Implement message processing logic based on your application needs
switch (topic) {
case "sensors/temperature":
handleTemperatureMessage(payload);
break;
case "sensors/humidity":
handleHumidityMessage(payload);
break;
case "commands/restart":
handleRestartCommand(payload);
break;
default:
System.out.println("Unknown topic: " + topic);
}
}
private void handleTemperatureMessage(String payload) {
try {
double temperature = Double.parseDouble(payload);
System.out.println("Temperature reading: " + temperature + "°C");
if (temperature > 30.0) {
publish("alerts/overheat", "Temperature too high: " + temperature, 1, false);
}
} catch (NumberFormatException e) {
System.err.println("Invalid temperature format: " + payload);
}
}
private void handleHumidityMessage(String payload) {
try {
double humidity = Double.parseDouble(payload);
System.out.println("Humidity reading: " + humidity + "%");
} catch (NumberFormatException e) {
System.err.println("Invalid humidity format: " + payload);
}
}
private void handleRestartCommand(String payload) {
System.out.println("Received restart command: " + payload);
// Implement restart logic
}
public boolean isConnected() {
return isConnected && client.isConnected();
}
public void disconnect() throws MqttException {
if (client != null && client.isConnected()) {
// Publish disconnect status
publish("client/status", "disconnecting", 1, true);
client.disconnect();
System.out.println("Disconnected from broker");
}
}
public void close() throws MqttException {
disconnect();
client.close();
}
}
MQTT v5 Client Implementation
3. MQTT v5 Client
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.*;
import org.eclipse.paho.mqttv5.common.packet.*;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
public class MqttV5Client implements MqttCallback {
private IMqttClient client;
private MqttConnectionOptions options;
public MqttV5Client(String brokerUrl, String clientId) throws MqttException {
this.client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
this.client.setCallback(this);
this.options = new MqttConnectionOptions();
}
public void connect() throws MqttException {
// MQTT v5 specific options
options.setAutomaticReconnect(true);
options.setCleanStart(true);
options.setSessionExpiryInterval(3600L); // 1 hour
options.setKeepAliveInterval(60);
// Set connection properties
MqttConnectionProperties properties = new MqttConnectionProperties();
properties.setSessionExpiryInterval(3600L);
properties.setReceiveMaximum(100);
options.setConnectionProperties(properties);
client.connect(options);
System.out.println("Connected with MQTT v5");
}
public void subscribeWithProperties(String topic, int qos) throws MqttException {
MqttSubscription subscription = new MqttSubscription(topic, qos);
MqttSubscribeOptions subscribeOptions = new MqttSubscribeOptions();
subscribeOptions.setSubscriptions(new MqttSubscription[]{subscription});
// Set subscription properties
MqttProperties properties = new MqttProperties();
// Add any subscription properties needed
client.subscribe(new MqttSubscription[]{subscription}, properties);
System.out.println("Subscribed to: " + topic);
}
public void publishWithProperties(String topic, String message, int qos,
boolean retained) throws MqttException {
MqttMessage mqttMessage = new MqttMessage(message.getBytes(), qos, retained, null);
// Set message properties
MqttProperties properties = new MqttProperties();
properties.setMessageExpiryInterval(3600L); // Message expires after 1 hour
mqttMessage.setProperties(properties);
client.publish(topic, mqttMessage);
}
// MQTT v5 callback methods
@Override
public void disconnected(MqttDisconnectResponse disconnectResponse) {
System.out.println("Disconnected: " + disconnectResponse);
}
@Override
public void mqttErrorOccurred(MqttException exception) {
System.err.println("MQTT Error: " + exception.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.printf("Message [%s]: %s (QoS: %d)%n",
topic, new String(message.getPayload()), message.getQos());
// Access MQTT v5 properties
MqttProperties properties = message.getProperties();
if (properties.getMessageExpiryInterval() != null) {
System.out.println("Message expiry: " + properties.getMessageExpiryInterval());
}
}
@Override
public void deliveryComplete(IMqttToken token) {
System.out.println("Delivery complete");
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("Connect complete. Reconnected: " + reconnect);
}
@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {
System.out.println("Auth packet arrived: " + reasonCode);
}
}
Practical Examples
4. IoT Sensor Data Publisher
import org.eclipse.paho.client.mqttv3.*;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class SensorDataPublisher {
private AdvancedMqttClient mqttClient;
private ScheduledExecutorService scheduler;
private Random random;
private String deviceId;
public SensorDataPublisher(String brokerUrl, String deviceId) throws MqttException {
this.mqttClient = new AdvancedMqttClient(brokerUrl, "sensor-publisher-" + deviceId);
this.scheduler = Executors.newScheduledThreadPool(1);
this.random = new Random();
this.deviceId = deviceId;
}
public void start() throws MqttException {
mqttClient.connect();
// Publish device online status
mqttClient.publish("devices/" + deviceId + "/status", "online", 1, true);
// Start publishing sensor data
scheduler.scheduleAtFixedRate(this::publishSensorData, 0, 5, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(this::publishSystemMetrics, 0, 30, TimeUnit.SECONDS);
System.out.println("Sensor data publisher started for device: " + deviceId);
}
private void publishSensorData() {
try {
// Simulate temperature sensor (15-35°C)
double temperature = 15 + random.nextDouble() * 20;
mqttClient.publish("sensors/" + deviceId + "/temperature",
String.format("%.2f", temperature), 0, false);
// Simulate humidity sensor (30-80%)
double humidity = 30 + random.nextDouble() * 50;
mqttClient.publish("sensors/" + deviceId + "/humidity",
String.format("%.2f", humidity), 0, false);
// Simulate pressure sensor (980-1020 hPa)
double pressure = 980 + random.nextDouble() * 40;
mqttClient.publish("sensors/" + deviceId + "/pressure",
String.format("%.2f", pressure), 0, false);
} catch (MqttException e) {
System.err.println("Failed to publish sensor data: " + e.getMessage());
}
}
private void publishSystemMetrics() {
try {
// Simulate system metrics
long uptime = System.currentTimeMillis();
double cpuUsage = random.nextDouble() * 100;
long memoryUsage = 50 + random.nextInt(50); // 50-100 MB
String metrics = String.format(
"{\"uptime\": %d, \"cpu_usage\": %.2f, \"memory_usage\": %d}",
uptime, cpuUsage, memoryUsage
);
mqttClient.publish("devices/" + deviceId + "/metrics", metrics, 1, false);
} catch (MqttException e) {
System.err.println("Failed to publish system metrics: " + e.getMessage());
}
}
public void stop() {
try {
scheduler.shutdown();
mqttClient.publish("devices/" + deviceId + "/status", "offline", 1, true);
mqttClient.disconnect();
System.out.println("Sensor data publisher stopped");
} catch (MqttException e) {
System.err.println("Error during shutdown: " + e.getMessage());
}
}
public static void main(String[] args) {
try {
SensorDataPublisher publisher = new SensorDataPublisher(
"tcp://localhost:1883", "sensor-001"
);
publisher.start();
// Run for 5 minutes then stop
Thread.sleep(300000);
publisher.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
5. Message Broker with Multiple Subscribers
import org.eclipse.paho.client.mqttv3.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class MqttMessageBroker {
private AdvancedMqttClient mqttClient;
private Map<String, List<MessageHandler>> topicHandlers;
private String clientId;
public MqttMessageBroker(String brokerUrl, String clientId) throws MqttException {
this.mqttClient = new AdvancedMqttClient(brokerUrl, clientId);
this.topicHandlers = new ConcurrentHashMap<>();
this.clientId = clientId;
}
public void start() throws MqttException {
mqttClient.connect();
// Subscribe to command topics
mqttClient.subscribe("commands/#", 1);
mqttClient.subscribe("alerts/#", 2);
System.out.println("Message broker started: " + clientId);
}
public void registerHandler(String topic, MessageHandler handler) {
topicHandlers.computeIfAbsent(topic, k -> new ArrayList<>()).add(handler);
try {
mqttClient.subscribe(topic, 1);
} catch (MqttException e) {
System.err.println("Failed to subscribe to topic: " + topic);
}
}
public void unregisterHandler(String topic, MessageHandler handler) {
List<MessageHandler> handlers = topicHandlers.get(topic);
if (handlers != null) {
handlers.remove(handler);
if (handlers.isEmpty()) {
topicHandlers.remove(topic);
try {
mqttClient.unsubscribe(topic);
} catch (MqttException e) {
System.err.println("Failed to unsubscribe from topic: " + topic);
}
}
}
}
public void publishMessage(String topic, String message, int qos) {
try {
mqttClient.publish(topic, message, qos, false);
} catch (MqttException e) {
System.err.println("Failed to publish message: " + e.getMessage());
}
}
public void broadcastMessage(String baseTopic, String message, int qos) {
topicHandlers.keySet().stream()
.filter(topic -> topic.startsWith(baseTopic))
.forEach(topic -> publishMessage(topic, message, qos));
}
// Message handler interface
public interface MessageHandler {
void handleMessage(String topic, String message);
}
// Process incoming messages and dispatch to handlers
public void processIncomingMessage(String topic, String message) {
List<MessageHandler> handlers = topicHandlers.get(topic);
if (handlers != null) {
handlers.forEach(handler -> handler.handleMessage(topic, message));
}
// Also check for wildcard matches
topicHandlers.entrySet().stream()
.filter(entry -> matchesWildcard(topic, entry.getKey()))
.flatMap(entry -> entry.getValue().stream())
.forEach(handler -> handler.handleMessage(topic, message));
}
private boolean matchesWildcard(String topic, String pattern) {
String[] topicParts = topic.split("/");
String[] patternParts = pattern.split("/");
if (patternParts.length > topicParts.length) {
return false;
}
for (int i = 0; i < patternParts.length; i++) {
if ("#".equals(patternParts[i])) {
return true; // Multi-level wildcard matches remaining
}
if (!"+".equals(patternParts[i]) && !patternParts[i].equals(topicParts[i])) {
return false;
}
}
return patternParts.length == topicParts.length;
}
public void stop() {
try {
mqttClient.disconnect();
System.out.println("Message broker stopped: " + clientId);
} catch (MqttException e) {
System.err.println("Error during shutdown: " + e.getMessage());
}
}
}
6. MQTT Client Manager for Multiple Connections
import org.eclipse.paho.client.mqttv3.*;
import java.util.*;
import java.util.concurrent.*;
public class MqttClientManager {
private Map<String, AdvancedMqttClient> clients;
private ScheduledExecutorService healthCheckScheduler;
private Properties brokerConfig;
public MqttClientManager() {
this.clients = new ConcurrentHashMap<>();
this.healthCheckScheduler = Executors.newScheduledThreadPool(1);
this.brokerConfig = new Properties();
loadBrokerConfig();
}
private void loadBrokerConfig() {
// Load broker configurations from properties file or environment
brokerConfig.setProperty("primary.broker", "tcp://broker1.example.com:1883");
brokerConfig.setProperty("secondary.broker", "tcp://broker2.example.com:1883");
brokerConfig.setProperty("backup.broker", "tcp://localhost:1883");
}
public void createClient(String clientId, String brokerKey) throws MqttException {
String brokerUrl = brokerConfig.getProperty(brokerKey);
if (brokerUrl == null) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION);
}
AdvancedMqttClient client = new AdvancedMqttClient(brokerUrl, clientId);
client.connect();
clients.put(clientId, client);
System.out.println("Created MQTT client: " + clientId);
}
public void publishToAll(String topic, String message, int qos) {
clients.values().forEach(client -> {
try {
if (client.isConnected()) {
client.publish(topic, message, qos, false);
}
} catch (MqttException e) {
System.err.println("Failed to publish to client: " + e.getMessage());
}
});
}
public void subscribeAll(String topic, int qos) {
clients.values().forEach(client -> {
try {
if (client.isConnected()) {
client.subscribe(topic, qos);
}
} catch (MqttException e) {
System.err.println("Failed to subscribe client: " + e.getMessage());
}
});
}
public void startHealthChecks() {
healthCheckScheduler.scheduleAtFixedRate(this::performHealthChecks, 0, 60, TimeUnit.SECONDS);
}
private void performHealthChecks() {
clients.forEach((clientId, client) -> {
try {
if (!client.isConnected()) {
System.out.println("Client " + clientId + " is disconnected. Attempting reconnect...");
// Implement reconnection logic
} else {
// Publish heartbeat
client.publish("heartbeat/" + clientId, "alive", 0, false);
}
} catch (MqttException e) {
System.err.println("Health check failed for " + clientId + ": " + e.getMessage());
}
});
}
public void closeClient(String clientId) {
AdvancedMqttClient client = clients.remove(clientId);
if (client != null) {
try {
client.close();
System.out.println("Closed MQTT client: " + clientId);
} catch (MqttException e) {
System.err.println("Error closing client " + clientId + ": " + e.getMessage());
}
}
}
public void shutdown() {
healthCheckScheduler.shutdown();
clients.keySet().forEach(this::closeClient);
System.out.println("MQTT Client Manager shutdown complete");
}
public static void main(String[] args) {
try {
MqttClientManager manager = new MqttClientManager();
// Create multiple clients
manager.createClient("client-1", "primary.broker");
manager.createClient("client-2", "secondary.broker");
// Subscribe to common topics
manager.subscribeAll("sensors/#", 1);
// Start health checks
manager.startHealthChecks();
// Publish test message
manager.publishToAll("test/topic", "Hello from manager", 1);
// Keep running
Thread.sleep(300000);
manager.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Error Handling and Best Practices
7. Robust MQTT Client with Comprehensive Error Handling
import org.eclipse.paho.client.mqttv3.*;
import java.util.concurrent.atomic.AtomicInteger;
public class RobustMqttClient {
private AdvancedMqttClient client;
private AtomicInteger connectionAttempts;
private long lastReconnectTime;
private static final long RECONNECT_DELAY = 5000; // 5 seconds
private static final int MAX_RECONNECT_ATTEMPTS = 5;
public RobustMqttClient(String brokerUrl, String clientId) throws MqttException {
this.client = new AdvancedMqttClient(brokerUrl, clientId);
this.connectionAttempts = new AtomicInteger(0);
}
public boolean connectWithRetry() {
while (connectionAttempts.get() < MAX_RECONNECT_ATTEMPTS) {
try {
client.connect();
connectionAttempts.set(0);
return true;
} catch (MqttException e) {
int attempts = connectionAttempts.incrementAndGet();
System.err.println("Connection attempt " + attempts + " failed: " + e.getMessage());
if (attempts >= MAX_RECONNECT_ATTEMPTS) {
System.err.println("Max reconnection attempts reached. Giving up.");
return false;
}
// Exponential backoff
long delay = RECONNECT_DELAY * (long) Math.pow(2, attempts - 1);
System.out.println("Waiting " + delay + "ms before next attempt...");
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return false;
}
}
}
return false;
}
public void safePublish(String topic, String message, int qos, boolean retained) {
try {
if (!client.isConnected()) {
System.out.println("Client not connected. Attempting reconnect...");
if (!connectWithRetry()) {
System.err.println("Failed to reconnect. Message not published.");
return;
}
}
client.publish(topic, message, qos, retained);
} catch (MqttException e) {
System.err.println("Failed to publish message: " + e.getMessage());
handlePublishError(e, topic, message);
}
}
private void handlePublishError(MqttException e, String topic, String message) {
switch (e.getReasonCode()) {
case MqttException.REASON_CODE_CLIENT_NOT_CONNECTED:
System.out.println("Client not connected. Will attempt to reconnect.");
break;
case MqttException.REASON_CODE_CLIENT_TIMEOUT:
System.out.println("Publish timeout. Message may have been sent.");
break;
case MqttException.REASON_CODE_MAX_INFLIGHT:
System.out.println("Max inflight messages reached. Retrying...");
// Implement retry logic with backoff
break;
default:
System.err.println("Unhandled publish error: " + e.getMessage());
}
// Optionally store failed messages for later retry
storeFailedMessage(topic, message);
}
private void storeFailedMessage(String topic, String message) {
// Implement message persistence for later retry
// This could be to a file, database, or in-memory queue
System.out.println("Storing failed message for topic: " + topic);
}
public void setConnectionLostHandler(Runnable handler) {
// Custom connection lost handling can be implemented here
}
public void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
System.out.println("Shutting down MQTT client...");
client.disconnect();
} catch (MqttException e) {
System.err.println("Error during shutdown: " + e.getMessage());
}
}));
}
}
Key Features Covered
- Basic MQTT Operations: Connect, publish, subscribe, disconnect
- Quality of Service (QoS): Levels 0, 1, and 2
- Connection Options: Authentication, SSL/TLS, last will and testament
- Error Handling: Comprehensive exception handling and recovery
- MQTT v5 Features: New protocol capabilities
- Real-world Patterns: Sensor data publishing, message brokering
- Management: Multiple client management, health checks
- Best Practices: Reconnection strategies, error recovery
Common MQTT Topics Patterns
sensors/+/temperature- Single-level wildcardsensors/#- Multi-level wildcarddevices/device-id/status- Specific device statusalerts/+/critical- Critical alerts from any source
This comprehensive MQTT client implementation provides a solid foundation for building robust IoT applications and messaging systems using the Paho library in Java.