Оффлайн-сообщения не используются в Moquette с Paho Client

У меня проблема с использованием автономных MQTT-сообщений на сервере Moquette через Paho-клиент Eclipse.

Ниже приведены шаги, которые я выполнил.

  1. Создал и раскрутил брокера Moquette MQTT.
  2. Создано простое потребительское приложение MQTT с использованием клиента eclipse Paho.
  3. Настроить потребителя на использование данных по теме: "устройства / сообщили /#" с QOS: 1 и CleanSession: False
  4. Создан простой издатель данных MQTT для публикации данных в брокере MQTT с использованием Eclipse Paho.
  5. Использовал издатель данных MQTT, чтобы публиковать сообщения в теме: "devices/reports /client_1" с QOS: 1

Вышеуказанные шаги были успешными без каких-либо проблем.

Затем я остановил свое потребительское приложение и отправил данные MQTT брокеру с той же темой. используя мое приложение издателя - сервер смог получить эти сообщения, но в этот момент не было ни одного потребителя, который мог бы использовать это сообщение, так как я остановил своего потребителя. Затем я снова запустил свое потребительское приложение. Он был успешно подключен к брокеру, но не получил ни одного сообщения, которое я отправил брокеру во время отключения потребителя.

Нужно ли выполнять какую-либо конкретную настройку для моего сервера Moquette для сохранения данных (с чистым сеансом: false)? Или я что-то упустил?

Пожалуйста, найдите мой пример кода ниже,

Инициализация сервера Moquette

package com.gbids.mqtt.moquette.main;

import com.gbids.mqtt.moquette.server.PublishInterceptor;
import io.moquette.interception.InterceptHandler;
import io.moquette.server.Server;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class ServerLauncher {

    public static void main(String[] args) throws IOException {
        Properties props = new Properties();
        final IConfig configs = new MemoryConfig(props);

        final Server mqttBroker = new Server();
        final List<? extends InterceptHandler> userHandlers = Arrays.asList(new PublishInterceptor());
        mqttBroker.startServer(configs, userHandlers);

        System.out.println("moquette mqtt broker started, press ctrl-c to shutdown..");
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.out.println("stopping moquette mqtt broker..");
                mqttBroker.stopServer();
                System.out.println("moquette mqtt broker stopped");
            }
        });
    }
}

MQTT Consumer

package com.gbids.mqtt.moquette.main;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ConsumerLauncher implements MqttCallback {

    private static final String topicPrefix = "devices/reported";
    private static final String broker = "tcp://0.0.0.0:1883";
    private static final String clientIdPrefix = "consumer";

    public static void main(String[] args) throws MqttException {
        final String clientId = "consumer_1";
        MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(false);
        sampleClient.connect(connOpts);
        sampleClient.subscribe(topicPrefix + "/#", 1);
        sampleClient.setCallback(new ConsumerLauncher());
    }

    public void connectionLost(Throwable throwable) {
        System.out.println("Consumer connection lost : " + throwable.getMessage());
    }

    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.println("Message arrived from topic : " + s + " | Message : " + new String(mqttMessage.getPayload()) + " | Message ID : " +mqttMessage.getId());
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("Delivery completed from : " + clientIdPrefix + "_1");
    }
}

MQTT Publisher

package com.gbids.mqtt.moquette.main;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ClientLauncher {

    private static final String content = "{\"randomData\": 25}";
    private static final String willContent = "Client disconnected unexpectedly";
    private static final String broker = "tcp://0.0.0.0:1883";
    private static final String clientIdPrefix = "client";

    public static void main(String[] args) throws Exception{
        sendDataWithQOSOne();
        System.exit(0);
    }

    private static void sendDataWithQOSOne(){
        try {
            final String clientId = "client_1";
            MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(false); // for publisher - this is not needed I think
            sampleClient.connect(connOpts);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(1);
            final String topic = "devices/reported/" + clientId;
            sampleClient.publish(topic, message);
            System.out.println("Message published from : " + clientId + " with payload of : " + content);
            sampleClient.disconnect();
        } catch (MqttException me) {
            me.printStackTrace();
        }
    }
}

1 ответ

В вашем случае вам нужно установить retained флаг для true при создании MqttMessage в вашем ClientLauncher (Издатель). Значением по умолчанию является false как в документации.

...    
message.setRetained(true)
...

Установка этого флага позволяет сохранять сообщения в брокере и отправлять их новым клиентам. Обратите внимание, что брокер хранит только последнее сообщение по теме. Невозможно сохранить более одного сообщения для определенной темы.

Другие вопросы по тегам