Оффлайн-сообщения не используются в Moquette с Paho Client
У меня проблема с использованием автономных MQTT-сообщений на сервере Moquette через Paho-клиент Eclipse.
Ниже приведены шаги, которые я выполнил.
- Создал и раскрутил брокера Moquette MQTT.
- Создано простое потребительское приложение MQTT с использованием клиента eclipse Paho.
- Настроить потребителя на использование данных по теме: "устройства / сообщили /#" с QOS: 1 и CleanSession: False
- Создан простой издатель данных MQTT для публикации данных в брокере MQTT с использованием Eclipse Paho.
- Использовал издатель данных MQTT, чтобы публиковать сообщения в теме: "devices/reports /client_1" с QOS: 1
Вышеуказанные шаги были успешными без каких-либо проблем.
Затем я остановил свое потребительское приложение и отправил данные MQTT брокеру с той же темой. используя мое приложение издателя - сервер смог получить эти сообщения, но в этот момент не было ни одного потребителя, который мог бы использовать это сообщение, так как я остановил своего потребителя. Затем я снова запустил свое потребительское приложение. Он был успешно подключен к брокеру, но не получил ни одного сообщения, которое я отправил брокеру во время отключения потребителя.
Нужно ли выполнять какую-либо конкретную настройку для моего сервера Moquette для сохранения данных (с чистым сеансом: false)? Или я что-то упустил?
Пожалуйста, найдите мой пример кода ниже,
Инициализация сервера Moquette
package com.gbids.mqtt.moquette.main;
import com.gbids.mqtt.moquette.server.PublishInterceptor;
import io.moquette.interception.InterceptHandler;
import io.moquette.server.Server;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class ServerLauncher {
public static void main(String[] args) throws IOException {
Properties props = new Properties();
final IConfig configs = new MemoryConfig(props);
final Server mqttBroker = new Server();
final List<? extends InterceptHandler> userHandlers = Arrays.asList(new PublishInterceptor());
mqttBroker.startServer(configs, userHandlers);
System.out.println("moquette mqtt broker started, press ctrl-c to shutdown..");
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("stopping moquette mqtt broker..");
mqttBroker.stopServer();
System.out.println("moquette mqtt broker stopped");
}
});
}
}
MQTT Consumer
package com.gbids.mqtt.moquette.main;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class ConsumerLauncher implements MqttCallback {
private static final String topicPrefix = "devices/reported";
private static final String broker = "tcp://0.0.0.0:1883";
private static final String clientIdPrefix = "consumer";
public static void main(String[] args) throws MqttException {
final String clientId = "consumer_1";
MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
sampleClient.connect(connOpts);
sampleClient.subscribe(topicPrefix + "/#", 1);
sampleClient.setCallback(new ConsumerLauncher());
}
public void connectionLost(Throwable throwable) {
System.out.println("Consumer connection lost : " + throwable.getMessage());
}
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("Message arrived from topic : " + s + " | Message : " + new String(mqttMessage.getPayload()) + " | Message ID : " +mqttMessage.getId());
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("Delivery completed from : " + clientIdPrefix + "_1");
}
}
MQTT Publisher
package com.gbids.mqtt.moquette.main;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class ClientLauncher {
private static final String content = "{\"randomData\": 25}";
private static final String willContent = "Client disconnected unexpectedly";
private static final String broker = "tcp://0.0.0.0:1883";
private static final String clientIdPrefix = "client";
public static void main(String[] args) throws Exception{
sendDataWithQOSOne();
System.exit(0);
}
private static void sendDataWithQOSOne(){
try {
final String clientId = "client_1";
MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false); // for publisher - this is not needed I think
sampleClient.connect(connOpts);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(1);
final String topic = "devices/reported/" + clientId;
sampleClient.publish(topic, message);
System.out.println("Message published from : " + clientId + " with payload of : " + content);
sampleClient.disconnect();
} catch (MqttException me) {
me.printStackTrace();
}
}
}
1 ответ
В вашем случае вам нужно установить retained
флаг для true
при создании MqttMessage
в вашем ClientLauncher
(Издатель). Значением по умолчанию является false
как в документации.
...
message.setRetained(true)
...
Установка этого флага позволяет сохранять сообщения в брокере и отправлять их новым клиентам. Обратите внимание, что брокер хранит только последнее сообщение по теме. Невозможно сохранить более одного сообщения для определенной темы.