MQTT Paho 客户端不会自动重新连接到 Android 服务上的代理

MQTT Paho Client not reconnect automatically to broker on Android Service

我有一个服务来管理我的 MQTT 客户端连接,MQTT 工作正常,但问题是当我重新启动 Broker Server 时,Android 客户端没有重新连接。 onConnectionLost() 回调触发异常。

备注

  1. 我在同一台计算机上使用 Moquette Broker -> Moquette
  2. 我有两个 Android 客户端应用程序,一个使用服务(有问题),另一个在线程上工作,没有服务(这工作正常,重新连接没问题)。
  3. 我无法 运行 Android 客户端 MQTT 库,因为我使用的是 Eclipse Paho MQTT。
  4. 是的,我setAutomaticReconnect(true);

问题

Android 使用服务的应用程序,永远工作,不会重新连接到 MQTT Broker。

代码

MQTTService.java

public class MQTTService extends Service implements MqttCallbackExtended {

    boolean running;
private static final String TAG = "MQTTService";

public static final String ACTION_MQTT_CONNECTED = "ACTION_MQTT_CONNECTED";
public static final String ACTION_MQTT_DISCONNECTED = "ACTION_MQTT_DISCONNECTED";
public static final String ACTION_DATA_ARRIVED = "ACTION_DATA_ARRIVED";

// MQTT
MqttClient mqttClient;
final String serverURI = "tcp://"+ServidorServices.IP+":1883";
final String clientId = "Responsavel";
String topicoId;
Thread mqttStartThread;

public boolean subscribe(String topic) {
    try {
        Log.i(TAG,"Subscripe: " + topic);
        mqttClient.subscribe(topic);
        mqttClient.subscribe("LOCATION_REAL");
        return true;
    } catch (Exception e) {
        e.printStackTrace();
    }
    return false;
}

// Life Cycle
@Override
public IBinder onBind(Intent intent) {
    Log.d(TAG,"onBind()");
    return null;
}

@Override
public void onCreate() {
    Log.d(TAG,"onCreate()");
    running = true;
    topicoId = getSharedPreferences("myprefs",MODE_PRIVATE).getString("tag_id_aluno","0");

    mqttStartThread = new MQTTStartThread(this);

    if(topicoId.equals("0")) {
        Log.i(TAG,"Error to subscribe");
        return;
    }

    mqttStartThread.start();
}

@Override
public int onStartCommand(Intent intent, int flags, int startId) {
    Log.d(TAG,"onStartCommand()");
    return super.onStartCommand(intent, flags, startId);
}

class MQTTStartThread extends Thread {

    MqttCallbackExtended mqttCallbackExtended;

    public MQTTStartThread(MqttCallbackExtended callbackExtended) {
        this.mqttCallbackExtended = callbackExtended;
    }

    @Override
    public void run() {
        try {
            mqttClient = new MqttClient(serverURI,clientId,new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setAutomaticReconnect(true);
            options.setCleanSession(true);
            mqttClient.setCallback(mqttCallbackExtended);
            mqttClient.connect();
        } catch (Exception e) {
            Log.i(TAG,"Exception MQTT CONNECT: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

@Override
public void onDestroy() {
    Log.d(TAG,"onDestroy()");
    running = false;
    if (mqttClient != null) {
        try {
            if (mqttClient.isConnected()) mqttClient.disconnect();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

@Override
public boolean onUnbind(Intent intent) {
    Log.i(TAG,"onUnbind()");
    return super.onUnbind(intent);
}

// Callbacks MQTT
@Override
public void connectComplete(boolean reconnect, String serverURI) {
    Log.i(TAG,"connectComplete()");
    if (topicoId == null) {
        Log.i(TAG,"Erro ao ler ID da Tag");
        return;
    }
    sendBroadcast(new Intent(ACTION_MQTT_CONNECTED));
    subscribe(topicoId);
}

@Override
public void connectionLost(Throwable cause) {
    Log.i(TAG,"connectionLost(): " + cause.getMessage());
    cause.printStackTrace();
    sendBroadcast(new Intent(ACTION_MQTT_DISCONNECTED));
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    Log.i(TAG,"messageArrived() topic: " + topic);

    if (topic.equals("LOCATION_REAL")) {
        Log.i(TAG,"Data: " + new String(message.getPayload()));
    } else {
        Context context = MQTTService.this;
        String data = new String(message.getPayload());
        Intent intent = new Intent(context,MapsActivity.class);
        intent.putExtra("location",data);
        LatLng latLng = new LatLng(Double.valueOf(data.split("_")[0]),Double.valueOf(data.split("_")[1]));
        String lugar = Utils.getAddressFromLatLng(latLng,getApplicationContext());
        NotificationUtil.create(context,intent,"Embarque",lugar,1);

        if (data.split("_").length < 3) {
            return;
        }

        double latitude = Double.valueOf(data.split("_")[0]);
        double longitude = Double.valueOf(data.split("_")[1]);
        String horario = data.split(" ")[2];

        Intent iMqttBroadcast = new Intent(ACTION_DATA_ARRIVED);
        iMqttBroadcast.putExtra("topico",String.valueOf(topic));
        iMqttBroadcast.putExtra("latitude",latitude);
        iMqttBroadcast.putExtra("longitude",longitude);
        iMqttBroadcast.putExtra("evento","Embarcou");
        iMqttBroadcast.putExtra("horario",horario);

        sendBroadcast(iMqttBroadcast);
    }
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
    Log.i(TAG,"deliveryComplete()");
}
}

Exception Stacktrace

I/MQTTService: connectionLost(): Connection lost
W/System.err: Connection lost (32109) - java.io.EOFException
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
W/System.err:     at java.lang.Thread.run(Thread.java:818)
W/System.err: Caused by: java.io.EOFException
W/System.err:     at java.io.DataInputStream.readByte(DataInputStream.java:77)
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
W/System.err:   ... 1 more

如方法描述所述。

  options.setAutomaticReconnect(true);

客户端将尝试重新连接到服务器。它最初会在尝试重新连接之前等待 1 秒,对于每次失败的重新连接尝试,延迟都会加倍,直到达到 2 分钟,此时延迟将保持在 2 分钟。

另一种选择是您可以在发生连接丢失事件时管理重试间隔。

我想你忘了将 MqttConnectOptions 包含在 MqttClient 对象中。

请按以下方式尝试

mqttClient.connect(options);

而不是

mqttClient.connect();

希望它能帮助解决您的重新连接问题。