QoS=1 且 MqttAsyncClient 订阅错过消息

QoS=1 with MqttAsyncClient subscription miss messages

我有充当 MQTT 客户端的前台服务。为此,我正在使用 MqttAsyncClient mqttClient
我正在使用 QoS=1 订阅主题:

mqttClient.subscribe("sensors/s1/", 1);

但如果我的 phone 离线一段时间,它会错过当前时段的消息。完整代码如下。

我是我正在使用的另一个应用程序 MqttAndroidClient mqttAndroidClient,在这种情况下,QoS=1 会带来所有错过的消息。

mqttAndroidClient.subscribe(topic, 1, null, new IMqttActionListener() {...})

为什么使用 QoS=1 的 MqttAsyncClient 订阅无法检索所有消息?

完整代码:

public class MqttGndService extends Service {

    private String ip="ssl:myserver",port="8887";
    private final IBinder mBinder = new LocalBinder();
    private Handler mHandler;


    private static final String TAG = "mqttservice";
    private static boolean hasWifi = false;
    private static boolean hasMmobile = false;
    private ConnectivityManager mConnMan;
    private volatile IMqttAsyncClient mqttClient;
    private String uniqueID;


    class MQTTBroadcastReceiver extends BroadcastReceiver {
        @Override
        public void onReceive(Context context, Intent intent) {

            IMqttToken token;
            boolean hasConnectivity = false;
            boolean hasChanged = false;
            NetworkInfo infos[] = mConnMan.getAllNetworkInfo();
            for (int i = 0; i < infos.length; i++) {
                if (infos[i].getTypeName().equalsIgnoreCase("MOBILE")) {
                    if ((infos[i].isConnected() != hasMmobile)) {
                        hasChanged = true;
                        hasMmobile = infos[i].isConnected();
                    }
                    Timber.tag(Utils.TIMBER_TAG).v( infos[i].getTypeName() + " is " + infos[i].isConnected());
                } else if (infos[i].getTypeName().equalsIgnoreCase("WIFI")) {
                    if ((infos[i].isConnected() != hasWifi)) {
                        hasChanged = true;
                        hasWifi = infos[i].isConnected();
                    }
                    Timber.tag(Utils.TIMBER_TAG).v(infos[i].getTypeName() + " is " + infos[i].isConnected());
                }
            }
            hasConnectivity = hasMmobile || hasWifi;
            Timber.tag(Utils.TIMBER_TAG).v( "hasConn: " + hasConnectivity + " hasChange: " + hasChanged + " - " + (mqttClient == null || !mqttClient.isConnected()));
            if (hasConnectivity && hasChanged && (mqttClient == null || !mqttClient.isConnected())) {
                Timber.tag(Utils.TIMBER_TAG).v("Ready to connect");
                doConnect();
                Timber.tag(Utils.TIMBER_TAG).v("do connect done");

            } else
            {
                Timber.tag(Utils.TIMBER_TAG).v("Connection not possible");
            }



        }
    }


    public class LocalBinder extends Binder {
        public MqttGndService getService() {
            // Return this instance of LocalService so clients can call public methods
            return MqttGndService.this;
        }
    }

    @Override
    public IBinder onBind(Intent intent) {
        return mBinder;
    }

    public void publish(String topic, MqttMessage message) {
        SharedPreferences sharedPref = PreferenceManager.getDefaultSharedPreferences(this);// we create a 'shared" memory where we will share our preferences for the limits and the values that we get from onsensorchanged
        try {

            mqttClient.publish(topic, message);

        } catch (MqttException e) {
            e.printStackTrace();
        }

    }


    @Override
    public void onCreate() {
        Timber.tag(Utils.TIMBER_TAG).v("Creating MQTT service");
        mHandler = new Handler();//for toasts
        IntentFilter intentf = new IntentFilter();
        setClientID();
        intentf.addAction(ConnectivityManager.CONNECTIVITY_ACTION);
        registerReceiver(new MQTTBroadcastReceiver(), new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
        mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
    }

    @Override
    public void onConfigurationChanged(Configuration newConfig) {
        Timber.tag(Utils.TIMBER_TAG).v( "onConfigurationChanged()");
        android.os.Debug.waitForDebugger();
        super.onConfigurationChanged(newConfig);

    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        Timber.tag(Utils.TIMBER_TAG).v("Service onDestroy");

    }


    private void setClientID() {
        uniqueID = android.provider.Settings.Secure.getString(getContentResolver(), android.provider.Settings.Secure.ANDROID_ID);
        Timber.tag(Utils.TIMBER_TAG).v("uniqueID=" + uniqueID);

    }


    private void doConnect() {
        String broker = ip + ":" + port;
        Timber.tag(Utils.TIMBER_TAG).v("mqtt_doConnect()");
        IMqttToken token;
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setMaxInflight(100);//handle more messages!!so as not to disconnect
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(1000);
        options.setKeepAliveInterval(300);
        options.setUserName("cc50e3e91bf4");
        options.setPassword("b".toCharArray());

        try {
            options.setSocketFactory(SocketFactoryMQ.getSocketFactory(this,""));
        } catch (KeyStoreException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (KeyManagementException e) {
            e.printStackTrace();
        } catch (CertificateException e) {
            e.printStackTrace();
        } catch (UnrecoverableKeyException e) {
            e.printStackTrace();
        }

        Timber.tag(Utils.TIMBER_TAG).v("set socket factory done");
        try {


            mqttClient = new MqttAsyncClient(broker, uniqueID, new MemoryPersistence());
            token = mqttClient.connect(options);
            token.waitForCompletion(3500);

            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable throwable) {
                    try {
                        mqttClient.disconnectForcibly();
                        mqttClient.connect();
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void messageArrived(String topic, MqttMessage msg) throws Exception {
                    Timber.tag(Utils.TIMBER_TAG).v("Message arrived from topic " + topic+ "  msg: " + msg );



                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("published");
                }
            });
            Timber.tag(Utils.TIMBER_TAG).v("will subscribe");
            mqttClient.subscribe("sensors/s1/", 1);



        } catch (MqttSecurityException e) {
            Timber.tag(Utils.TIMBER_TAG).v("general connect exception");
            e.printStackTrace();
        } catch (MqttException e) {
            switch (e.getReasonCode()) {
                case MqttException.REASON_CODE_BROKER_UNAVAILABLE:
                    mHandler.post(new ToastRunnable("WE ARE OFFLINE BROKER_UNAVAILABLE!", 1500));
                    break;
                case MqttException.REASON_CODE_CLIENT_TIMEOUT:
                    mHandler.post(new ToastRunnable("WE ARE OFFLINE CLIENT_TIMEOUT!", 1500));
                    break;
                case MqttException.REASON_CODE_CONNECTION_LOST:
                    mHandler.post(new ToastRunnable("WE ARE OFFLINE CONNECTION_LOST!", 1500));
                    break;
                case MqttException.REASON_CODE_SERVER_CONNECT_ERROR:
                    Timber.tag(Utils.TIMBER_TAG).v( "c " + e.getMessage());
                    e.printStackTrace();
                    break;
                case MqttException.REASON_CODE_FAILED_AUTHENTICATION:
                    Intent i = new Intent("RAISEALLARM");
                    i.putExtra("ALLARM", e);
                    Timber.tag(Utils.TIMBER_TAG).v("b " + e.getMessage());
                    break;
                default:
                    Timber.tag(Utils.TIMBER_TAG).v( "a " + e.getMessage() +" "+ e.toString());
            }
        }
        mHandler.post(new ToastRunnable("WE ARE ONLINE!", 500));

    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        Timber.tag(Utils.TIMBER_TAG).v("onStartCommand");
        String input = intent.getStringExtra(INTENT_ID);
        Timber.tag(Utils.TIMBER_TAG).v("onStartCommand "+ input);

        Intent notificationIntent = new Intent(this, MainActivity.class);
        PendingIntent pendingIntent = PendingIntent.getActivity(this,
                0, notificationIntent, 0);

        Notification notification = new NotificationCompat.Builder(this, CHANNEL_ID)
                .setContentTitle("Example Service")
                .setContentText(input)
                .setSmallIcon(R.drawable.ic_android)
                .setContentIntent(pendingIntent)
                .build();

        startForeground(1, notification);


        PowerManager powerManager = (PowerManager) getSystemService(POWER_SERVICE);
        PowerManager.WakeLock wakeLock = powerManager.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "MyApp::MyWakelockTag");
        wakeLock.acquire();


        return START_STICKY;
    }
}

您正在将 cleansession 设置为 true (options.setCleanSession(true));来自 setCleanSession:

的文档

If set to true the client and server will not maintain state across restarts of the client, the server or the connection. This means

  • Message delivery to the specified QOS cannot be maintained if the client, server or connection are restarted
  • The server will treat a subscription as non-durable

我认为 mqtt specs 表述得更清楚:

If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one. This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be reused in any subsequent Session

因此,当您的应用程序断开连接时,会话将被丢弃,新消息将不会排队等待传递。此外,除非您在连接恢复时重新订阅,否则您将不会收到任何其他消息。

但是请注意,如果您将 cleansession 设置为 false,那么在您的客户端离线时收到的任何新消息都将排队等待传送(取决于代理的配置),如果您不希望发生这种情况,客户端可能会长时间离线