MQTT Paho Client не переподключается автоматически к брокеру в службе Android
У меня есть Служба для управления моим клиентским подключением MQTT, MQTT работает нормально, но проблема в том, что когда я перезапускаю Broker Server, клиент Android не переподключается. Исключение вызвано на onConnectionLost()
Перезвоните.
Заметки
- Я использую Moquette Broker на одном компьютере -> Moquette
- У меня есть два клиентских приложения для Android: использование сервиса (проблемное) и другое, работающее в потоке без службы (это работает нормально, переподключение в порядке).
- Я не могу запустить Android Client MQTT lib, потому что для этого я использую Eclipse Paho MQTT.
- Да я делаю
setAutomaticReconnect(true);
проблема
Android-приложение, использующее Сервис, работает вечно, не переподключаясь к MQTT Broker.
Код
MQTTService.java
public class MQTTService extends Service implements MqttCallbackExtended {
boolean running;
private static final String TAG = "MQTTService";
public static final String ACTION_MQTT_CONNECTED = "ACTION_MQTT_CONNECTED";
public static final String ACTION_MQTT_DISCONNECTED = "ACTION_MQTT_DISCONNECTED";
public static final String ACTION_DATA_ARRIVED = "ACTION_DATA_ARRIVED";
// MQTT
MqttClient mqttClient;
final String serverURI = "tcp://"+ServidorServices.IP+":1883";
final String clientId = "Responsavel";
String topicoId;
Thread mqttStartThread;
public boolean subscribe(String topic) {
try {
Log.i(TAG,"Subscripe: " + topic);
mqttClient.subscribe(topic);
mqttClient.subscribe("LOCATION_REAL");
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// Life Cycle
@Override
public IBinder onBind(Intent intent) {
Log.d(TAG,"onBind()");
return null;
}
@Override
public void onCreate() {
Log.d(TAG,"onCreate()");
running = true;
topicoId = getSharedPreferences("myprefs",MODE_PRIVATE).getString("tag_id_aluno","0");
mqttStartThread = new MQTTStartThread(this);
if(topicoId.equals("0")) {
Log.i(TAG,"Error to subscribe");
return;
}
mqttStartThread.start();
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
Log.d(TAG,"onStartCommand()");
return super.onStartCommand(intent, flags, startId);
}
class MQTTStartThread extends Thread {
MqttCallbackExtended mqttCallbackExtended;
public MQTTStartThread(MqttCallbackExtended callbackExtended) {
this.mqttCallbackExtended = callbackExtended;
}
@Override
public void run() {
try {
mqttClient = new MqttClient(serverURI,clientId,new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
mqttClient.setCallback(mqttCallbackExtended);
mqttClient.connect();
} catch (Exception e) {
Log.i(TAG,"Exception MQTT CONNECT: " + e.getMessage());
e.printStackTrace();
}
}
}
@Override
public void onDestroy() {
Log.d(TAG,"onDestroy()");
running = false;
if (mqttClient != null) {
try {
if (mqttClient.isConnected()) mqttClient.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override
public boolean onUnbind(Intent intent) {
Log.i(TAG,"onUnbind()");
return super.onUnbind(intent);
}
// Callbacks MQTT
@Override
public void connectComplete(boolean reconnect, String serverURI) {
Log.i(TAG,"connectComplete()");
if (topicoId == null) {
Log.i(TAG,"Erro ao ler ID da Tag");
return;
}
sendBroadcast(new Intent(ACTION_MQTT_CONNECTED));
subscribe(topicoId);
}
@Override
public void connectionLost(Throwable cause) {
Log.i(TAG,"connectionLost(): " + cause.getMessage());
cause.printStackTrace();
sendBroadcast(new Intent(ACTION_MQTT_DISCONNECTED));
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.i(TAG,"messageArrived() topic: " + topic);
if (topic.equals("LOCATION_REAL")) {
Log.i(TAG,"Data: " + new String(message.getPayload()));
} else {
Context context = MQTTService.this;
String data = new String(message.getPayload());
Intent intent = new Intent(context,MapsActivity.class);
intent.putExtra("location",data);
LatLng latLng = new LatLng(Double.valueOf(data.split("_")[0]),Double.valueOf(data.split("_")[1]));
String lugar = Utils.getAddressFromLatLng(latLng,getApplicationContext());
NotificationUtil.create(context,intent,"Embarque",lugar,1);
if (data.split("_").length < 3) {
return;
}
double latitude = Double.valueOf(data.split("_")[0]);
double longitude = Double.valueOf(data.split("_")[1]);
String horario = data.split(" ")[2];
Intent iMqttBroadcast = new Intent(ACTION_DATA_ARRIVED);
iMqttBroadcast.putExtra("topico",String.valueOf(topic));
iMqttBroadcast.putExtra("latitude",latitude);
iMqttBroadcast.putExtra("longitude",longitude);
iMqttBroadcast.putExtra("evento","Embarcou");
iMqttBroadcast.putExtra("horario",horario);
sendBroadcast(iMqttBroadcast);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.i(TAG,"deliveryComplete()");
}
}
Исключение Stacktrace
I/MQTTService: connectionLost(): Connection lost
W/System.err: Connection lost (32109) - java.io.EOFException
W/System.err: at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
W/System.err: at java.lang.Thread.run(Thread.java:818)
W/System.err: Caused by: java.io.EOFException
W/System.err: at java.io.DataInputStream.readByte(DataInputStream.java:77)
W/System.err: at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
W/System.err: at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
W/System.err: ... 1 more
2 ответа
Я думаю, что вы забыли включить MqttConnectOptions
с MqttClient
объект.
Пожалуйста, попробуйте как следует
mqttClient.connect(options);
вместо
mqttClient.connect();
Надеюсь, это поможет решить проблему повторного подключения.
Как говорится в описании метода.
options.setAutomaticReconnect(true);
Клиент попытается переподключиться к серверу. Первоначально он будет ждать 1 секунду, прежде чем попытаться повторно подключиться, при каждой неудачной попытке повторного подключения задержка будет удваиваться, пока она не составит 2 минуты, после чего задержка останется равной 2 минутам.
Другой вариант - вы можете управлять интервалом повторных попыток в случае потери соединения.