MQTT Paho Client не переподключается автоматически к брокеру в службе Android

У меня есть Служба для управления моим клиентским подключением MQTT, MQTT работает нормально, но проблема в том, что когда я перезапускаю Broker Server, клиент Android не переподключается. Исключение вызвано на onConnectionLost() Перезвоните.

Заметки

  1. Я использую Moquette Broker на одном компьютере -> Moquette
  2. У меня есть два клиентских приложения для Android: использование сервиса (проблемное) и другое, работающее в потоке без службы (это работает нормально, переподключение в порядке).
  3. Я не могу запустить Android Client MQTT lib, потому что для этого я использую Eclipse Paho MQTT.
  4. Да я делаю setAutomaticReconnect(true);

проблема

Android-приложение, использующее Сервис, работает вечно, не переподключаясь к MQTT Broker.

Код

MQTTService.java

public class MQTTService extends Service implements MqttCallbackExtended {

    boolean running;
private static final String TAG = "MQTTService";

public static final String ACTION_MQTT_CONNECTED = "ACTION_MQTT_CONNECTED";
public static final String ACTION_MQTT_DISCONNECTED = "ACTION_MQTT_DISCONNECTED";
public static final String ACTION_DATA_ARRIVED = "ACTION_DATA_ARRIVED";

// MQTT
MqttClient mqttClient;
final String serverURI = "tcp://"+ServidorServices.IP+":1883";
final String clientId = "Responsavel";
String topicoId;
Thread mqttStartThread;

public boolean subscribe(String topic) {
    try {
        Log.i(TAG,"Subscripe: " + topic);
        mqttClient.subscribe(topic);
        mqttClient.subscribe("LOCATION_REAL");
        return true;
    } catch (Exception e) {
        e.printStackTrace();
    }
    return false;
}

// Life Cycle
@Override
public IBinder onBind(Intent intent) {
    Log.d(TAG,"onBind()");
    return null;
}

@Override
public void onCreate() {
    Log.d(TAG,"onCreate()");
    running = true;
    topicoId = getSharedPreferences("myprefs",MODE_PRIVATE).getString("tag_id_aluno","0");

    mqttStartThread = new MQTTStartThread(this);

    if(topicoId.equals("0")) {
        Log.i(TAG,"Error to subscribe");
        return;
    }

    mqttStartThread.start();
}

@Override
public int onStartCommand(Intent intent, int flags, int startId) {
    Log.d(TAG,"onStartCommand()");
    return super.onStartCommand(intent, flags, startId);
}

class MQTTStartThread extends Thread {

    MqttCallbackExtended mqttCallbackExtended;

    public MQTTStartThread(MqttCallbackExtended callbackExtended) {
        this.mqttCallbackExtended = callbackExtended;
    }

    @Override
    public void run() {
        try {
            mqttClient = new MqttClient(serverURI,clientId,new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setAutomaticReconnect(true);
            options.setCleanSession(true);
            mqttClient.setCallback(mqttCallbackExtended);
            mqttClient.connect();
        } catch (Exception e) {
            Log.i(TAG,"Exception MQTT CONNECT: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

@Override
public void onDestroy() {
    Log.d(TAG,"onDestroy()");
    running = false;
    if (mqttClient != null) {
        try {
            if (mqttClient.isConnected()) mqttClient.disconnect();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

@Override
public boolean onUnbind(Intent intent) {
    Log.i(TAG,"onUnbind()");
    return super.onUnbind(intent);
}

// Callbacks MQTT
@Override
public void connectComplete(boolean reconnect, String serverURI) {
    Log.i(TAG,"connectComplete()");
    if (topicoId == null) {
        Log.i(TAG,"Erro ao ler ID da Tag");
        return;
    }
    sendBroadcast(new Intent(ACTION_MQTT_CONNECTED));
    subscribe(topicoId);
}

@Override
public void connectionLost(Throwable cause) {
    Log.i(TAG,"connectionLost(): " + cause.getMessage());
    cause.printStackTrace();
    sendBroadcast(new Intent(ACTION_MQTT_DISCONNECTED));
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    Log.i(TAG,"messageArrived() topic: " + topic);

    if (topic.equals("LOCATION_REAL")) {
        Log.i(TAG,"Data: " + new String(message.getPayload()));
    } else {
        Context context = MQTTService.this;
        String data = new String(message.getPayload());
        Intent intent = new Intent(context,MapsActivity.class);
        intent.putExtra("location",data);
        LatLng latLng = new LatLng(Double.valueOf(data.split("_")[0]),Double.valueOf(data.split("_")[1]));
        String lugar = Utils.getAddressFromLatLng(latLng,getApplicationContext());
        NotificationUtil.create(context,intent,"Embarque",lugar,1);

        if (data.split("_").length < 3) {
            return;
        }

        double latitude = Double.valueOf(data.split("_")[0]);
        double longitude = Double.valueOf(data.split("_")[1]);
        String horario = data.split(" ")[2];

        Intent iMqttBroadcast = new Intent(ACTION_DATA_ARRIVED);
        iMqttBroadcast.putExtra("topico",String.valueOf(topic));
        iMqttBroadcast.putExtra("latitude",latitude);
        iMqttBroadcast.putExtra("longitude",longitude);
        iMqttBroadcast.putExtra("evento","Embarcou");
        iMqttBroadcast.putExtra("horario",horario);

        sendBroadcast(iMqttBroadcast);
    }
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
    Log.i(TAG,"deliveryComplete()");
}
}

Исключение Stacktrace

I/MQTTService: connectionLost(): Connection lost
W/System.err: Connection lost (32109) - java.io.EOFException
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
W/System.err:     at java.lang.Thread.run(Thread.java:818)
W/System.err: Caused by: java.io.EOFException
W/System.err:     at java.io.DataInputStream.readByte(DataInputStream.java:77)
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
W/System.err:   ... 1 more

2 ответа

Решение

Я думаю, что вы забыли включить MqttConnectOptions с MqttClient объект.

Пожалуйста, попробуйте как следует

mqttClient.connect(options);

вместо

mqttClient.connect();

Надеюсь, это поможет решить проблему повторного подключения.

Как говорится в описании метода.

  options.setAutomaticReconnect(true);

Клиент попытается переподключиться к серверу. Первоначально он будет ждать 1 секунду, прежде чем попытаться повторно подключиться, при каждой неудачной попытке повторного подключения задержка будет удваиваться, пока она не составит 2 минуты, после чего задержка останется равной 2 минутам.

Другой вариант - вы можете управлять интервалом повторных попыток в случае потери соединения.

Другие вопросы по тегам