QoS=1 且 MqttAsyncClient 订阅错过消息
QoS=1 with MqttAsyncClient subscription miss messages
我有充当 MQTT 客户端的前台服务。为此,我正在使用 MqttAsyncClient mqttClient
。
我正在使用 QoS=1
订阅主题:
mqttClient.subscribe("sensors/s1/", 1);
但如果我的 phone 离线一段时间,它会错过当前时段的消息。完整代码如下。
我是我正在使用的另一个应用程序 MqttAndroidClient mqttAndroidClient
,在这种情况下,QoS=1 会带来所有错过的消息。
mqttAndroidClient.subscribe(topic, 1, null, new IMqttActionListener() {...})
为什么使用 QoS=1 的 MqttAsyncClient
订阅无法检索所有消息?
完整代码:
public class MqttGndService extends Service {
private String ip="ssl:myserver",port="8887";
private final IBinder mBinder = new LocalBinder();
private Handler mHandler;
private static final String TAG = "mqttservice";
private static boolean hasWifi = false;
private static boolean hasMmobile = false;
private ConnectivityManager mConnMan;
private volatile IMqttAsyncClient mqttClient;
private String uniqueID;
class MQTTBroadcastReceiver extends BroadcastReceiver {
@Override
public void onReceive(Context context, Intent intent) {
IMqttToken token;
boolean hasConnectivity = false;
boolean hasChanged = false;
NetworkInfo infos[] = mConnMan.getAllNetworkInfo();
for (int i = 0; i < infos.length; i++) {
if (infos[i].getTypeName().equalsIgnoreCase("MOBILE")) {
if ((infos[i].isConnected() != hasMmobile)) {
hasChanged = true;
hasMmobile = infos[i].isConnected();
}
Timber.tag(Utils.TIMBER_TAG).v( infos[i].getTypeName() + " is " + infos[i].isConnected());
} else if (infos[i].getTypeName().equalsIgnoreCase("WIFI")) {
if ((infos[i].isConnected() != hasWifi)) {
hasChanged = true;
hasWifi = infos[i].isConnected();
}
Timber.tag(Utils.TIMBER_TAG).v(infos[i].getTypeName() + " is " + infos[i].isConnected());
}
}
hasConnectivity = hasMmobile || hasWifi;
Timber.tag(Utils.TIMBER_TAG).v( "hasConn: " + hasConnectivity + " hasChange: " + hasChanged + " - " + (mqttClient == null || !mqttClient.isConnected()));
if (hasConnectivity && hasChanged && (mqttClient == null || !mqttClient.isConnected())) {
Timber.tag(Utils.TIMBER_TAG).v("Ready to connect");
doConnect();
Timber.tag(Utils.TIMBER_TAG).v("do connect done");
} else
{
Timber.tag(Utils.TIMBER_TAG).v("Connection not possible");
}
}
}
public class LocalBinder extends Binder {
public MqttGndService getService() {
// Return this instance of LocalService so clients can call public methods
return MqttGndService.this;
}
}
@Override
public IBinder onBind(Intent intent) {
return mBinder;
}
public void publish(String topic, MqttMessage message) {
SharedPreferences sharedPref = PreferenceManager.getDefaultSharedPreferences(this);// we create a 'shared" memory where we will share our preferences for the limits and the values that we get from onsensorchanged
try {
mqttClient.publish(topic, message);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onCreate() {
Timber.tag(Utils.TIMBER_TAG).v("Creating MQTT service");
mHandler = new Handler();//for toasts
IntentFilter intentf = new IntentFilter();
setClientID();
intentf.addAction(ConnectivityManager.CONNECTIVITY_ACTION);
registerReceiver(new MQTTBroadcastReceiver(), new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
}
@Override
public void onConfigurationChanged(Configuration newConfig) {
Timber.tag(Utils.TIMBER_TAG).v( "onConfigurationChanged()");
android.os.Debug.waitForDebugger();
super.onConfigurationChanged(newConfig);
}
@Override
public void onDestroy() {
super.onDestroy();
Timber.tag(Utils.TIMBER_TAG).v("Service onDestroy");
}
private void setClientID() {
uniqueID = android.provider.Settings.Secure.getString(getContentResolver(), android.provider.Settings.Secure.ANDROID_ID);
Timber.tag(Utils.TIMBER_TAG).v("uniqueID=" + uniqueID);
}
private void doConnect() {
String broker = ip + ":" + port;
Timber.tag(Utils.TIMBER_TAG).v("mqtt_doConnect()");
IMqttToken token;
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setMaxInflight(100);//handle more messages!!so as not to disconnect
options.setAutomaticReconnect(true);
options.setConnectionTimeout(1000);
options.setKeepAliveInterval(300);
options.setUserName("cc50e3e91bf4");
options.setPassword("b".toCharArray());
try {
options.setSocketFactory(SocketFactoryMQ.getSocketFactory(this,""));
} catch (KeyStoreException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (KeyManagementException e) {
e.printStackTrace();
} catch (CertificateException e) {
e.printStackTrace();
} catch (UnrecoverableKeyException e) {
e.printStackTrace();
}
Timber.tag(Utils.TIMBER_TAG).v("set socket factory done");
try {
mqttClient = new MqttAsyncClient(broker, uniqueID, new MemoryPersistence());
token = mqttClient.connect(options);
token.waitForCompletion(3500);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
try {
mqttClient.disconnectForcibly();
mqttClient.connect();
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void messageArrived(String topic, MqttMessage msg) throws Exception {
Timber.tag(Utils.TIMBER_TAG).v("Message arrived from topic " + topic+ " msg: " + msg );
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("published");
}
});
Timber.tag(Utils.TIMBER_TAG).v("will subscribe");
mqttClient.subscribe("sensors/s1/", 1);
} catch (MqttSecurityException e) {
Timber.tag(Utils.TIMBER_TAG).v("general connect exception");
e.printStackTrace();
} catch (MqttException e) {
switch (e.getReasonCode()) {
case MqttException.REASON_CODE_BROKER_UNAVAILABLE:
mHandler.post(new ToastRunnable("WE ARE OFFLINE BROKER_UNAVAILABLE!", 1500));
break;
case MqttException.REASON_CODE_CLIENT_TIMEOUT:
mHandler.post(new ToastRunnable("WE ARE OFFLINE CLIENT_TIMEOUT!", 1500));
break;
case MqttException.REASON_CODE_CONNECTION_LOST:
mHandler.post(new ToastRunnable("WE ARE OFFLINE CONNECTION_LOST!", 1500));
break;
case MqttException.REASON_CODE_SERVER_CONNECT_ERROR:
Timber.tag(Utils.TIMBER_TAG).v( "c " + e.getMessage());
e.printStackTrace();
break;
case MqttException.REASON_CODE_FAILED_AUTHENTICATION:
Intent i = new Intent("RAISEALLARM");
i.putExtra("ALLARM", e);
Timber.tag(Utils.TIMBER_TAG).v("b " + e.getMessage());
break;
default:
Timber.tag(Utils.TIMBER_TAG).v( "a " + e.getMessage() +" "+ e.toString());
}
}
mHandler.post(new ToastRunnable("WE ARE ONLINE!", 500));
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
Timber.tag(Utils.TIMBER_TAG).v("onStartCommand");
String input = intent.getStringExtra(INTENT_ID);
Timber.tag(Utils.TIMBER_TAG).v("onStartCommand "+ input);
Intent notificationIntent = new Intent(this, MainActivity.class);
PendingIntent pendingIntent = PendingIntent.getActivity(this,
0, notificationIntent, 0);
Notification notification = new NotificationCompat.Builder(this, CHANNEL_ID)
.setContentTitle("Example Service")
.setContentText(input)
.setSmallIcon(R.drawable.ic_android)
.setContentIntent(pendingIntent)
.build();
startForeground(1, notification);
PowerManager powerManager = (PowerManager) getSystemService(POWER_SERVICE);
PowerManager.WakeLock wakeLock = powerManager.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "MyApp::MyWakelockTag");
wakeLock.acquire();
return START_STICKY;
}
}
您正在将 cleansession 设置为 true (options.setCleanSession(true)
);来自 setCleanSession:
的文档
If set to true the client and server will not maintain state across restarts of the client, the server or the connection. This means
- Message delivery to the specified QOS cannot be maintained if the client, server or connection are restarted
- The server will treat a subscription as non-durable
我认为 mqtt specs 表述得更清楚:
If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one. This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be reused in any subsequent Session
因此,当您的应用程序断开连接时,会话将被丢弃,新消息将不会排队等待传递。此外,除非您在连接恢复时重新订阅,否则您将不会收到任何其他消息。
但是请注意,如果您将 cleansession 设置为 false,那么在您的客户端离线时收到的任何新消息都将排队等待传送(取决于代理的配置),如果您不希望发生这种情况,客户端可能会长时间离线
我有充当 MQTT 客户端的前台服务。为此,我正在使用 MqttAsyncClient mqttClient
。
我正在使用 QoS=1
订阅主题:
mqttClient.subscribe("sensors/s1/", 1);
但如果我的 phone 离线一段时间,它会错过当前时段的消息。完整代码如下。
我是我正在使用的另一个应用程序 MqttAndroidClient mqttAndroidClient
,在这种情况下,QoS=1 会带来所有错过的消息。
mqttAndroidClient.subscribe(topic, 1, null, new IMqttActionListener() {...})
为什么使用 QoS=1 的 MqttAsyncClient
订阅无法检索所有消息?
完整代码:
public class MqttGndService extends Service {
private String ip="ssl:myserver",port="8887";
private final IBinder mBinder = new LocalBinder();
private Handler mHandler;
private static final String TAG = "mqttservice";
private static boolean hasWifi = false;
private static boolean hasMmobile = false;
private ConnectivityManager mConnMan;
private volatile IMqttAsyncClient mqttClient;
private String uniqueID;
class MQTTBroadcastReceiver extends BroadcastReceiver {
@Override
public void onReceive(Context context, Intent intent) {
IMqttToken token;
boolean hasConnectivity = false;
boolean hasChanged = false;
NetworkInfo infos[] = mConnMan.getAllNetworkInfo();
for (int i = 0; i < infos.length; i++) {
if (infos[i].getTypeName().equalsIgnoreCase("MOBILE")) {
if ((infos[i].isConnected() != hasMmobile)) {
hasChanged = true;
hasMmobile = infos[i].isConnected();
}
Timber.tag(Utils.TIMBER_TAG).v( infos[i].getTypeName() + " is " + infos[i].isConnected());
} else if (infos[i].getTypeName().equalsIgnoreCase("WIFI")) {
if ((infos[i].isConnected() != hasWifi)) {
hasChanged = true;
hasWifi = infos[i].isConnected();
}
Timber.tag(Utils.TIMBER_TAG).v(infos[i].getTypeName() + " is " + infos[i].isConnected());
}
}
hasConnectivity = hasMmobile || hasWifi;
Timber.tag(Utils.TIMBER_TAG).v( "hasConn: " + hasConnectivity + " hasChange: " + hasChanged + " - " + (mqttClient == null || !mqttClient.isConnected()));
if (hasConnectivity && hasChanged && (mqttClient == null || !mqttClient.isConnected())) {
Timber.tag(Utils.TIMBER_TAG).v("Ready to connect");
doConnect();
Timber.tag(Utils.TIMBER_TAG).v("do connect done");
} else
{
Timber.tag(Utils.TIMBER_TAG).v("Connection not possible");
}
}
}
public class LocalBinder extends Binder {
public MqttGndService getService() {
// Return this instance of LocalService so clients can call public methods
return MqttGndService.this;
}
}
@Override
public IBinder onBind(Intent intent) {
return mBinder;
}
public void publish(String topic, MqttMessage message) {
SharedPreferences sharedPref = PreferenceManager.getDefaultSharedPreferences(this);// we create a 'shared" memory where we will share our preferences for the limits and the values that we get from onsensorchanged
try {
mqttClient.publish(topic, message);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onCreate() {
Timber.tag(Utils.TIMBER_TAG).v("Creating MQTT service");
mHandler = new Handler();//for toasts
IntentFilter intentf = new IntentFilter();
setClientID();
intentf.addAction(ConnectivityManager.CONNECTIVITY_ACTION);
registerReceiver(new MQTTBroadcastReceiver(), new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
}
@Override
public void onConfigurationChanged(Configuration newConfig) {
Timber.tag(Utils.TIMBER_TAG).v( "onConfigurationChanged()");
android.os.Debug.waitForDebugger();
super.onConfigurationChanged(newConfig);
}
@Override
public void onDestroy() {
super.onDestroy();
Timber.tag(Utils.TIMBER_TAG).v("Service onDestroy");
}
private void setClientID() {
uniqueID = android.provider.Settings.Secure.getString(getContentResolver(), android.provider.Settings.Secure.ANDROID_ID);
Timber.tag(Utils.TIMBER_TAG).v("uniqueID=" + uniqueID);
}
private void doConnect() {
String broker = ip + ":" + port;
Timber.tag(Utils.TIMBER_TAG).v("mqtt_doConnect()");
IMqttToken token;
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setMaxInflight(100);//handle more messages!!so as not to disconnect
options.setAutomaticReconnect(true);
options.setConnectionTimeout(1000);
options.setKeepAliveInterval(300);
options.setUserName("cc50e3e91bf4");
options.setPassword("b".toCharArray());
try {
options.setSocketFactory(SocketFactoryMQ.getSocketFactory(this,""));
} catch (KeyStoreException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (KeyManagementException e) {
e.printStackTrace();
} catch (CertificateException e) {
e.printStackTrace();
} catch (UnrecoverableKeyException e) {
e.printStackTrace();
}
Timber.tag(Utils.TIMBER_TAG).v("set socket factory done");
try {
mqttClient = new MqttAsyncClient(broker, uniqueID, new MemoryPersistence());
token = mqttClient.connect(options);
token.waitForCompletion(3500);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
try {
mqttClient.disconnectForcibly();
mqttClient.connect();
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void messageArrived(String topic, MqttMessage msg) throws Exception {
Timber.tag(Utils.TIMBER_TAG).v("Message arrived from topic " + topic+ " msg: " + msg );
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("published");
}
});
Timber.tag(Utils.TIMBER_TAG).v("will subscribe");
mqttClient.subscribe("sensors/s1/", 1);
} catch (MqttSecurityException e) {
Timber.tag(Utils.TIMBER_TAG).v("general connect exception");
e.printStackTrace();
} catch (MqttException e) {
switch (e.getReasonCode()) {
case MqttException.REASON_CODE_BROKER_UNAVAILABLE:
mHandler.post(new ToastRunnable("WE ARE OFFLINE BROKER_UNAVAILABLE!", 1500));
break;
case MqttException.REASON_CODE_CLIENT_TIMEOUT:
mHandler.post(new ToastRunnable("WE ARE OFFLINE CLIENT_TIMEOUT!", 1500));
break;
case MqttException.REASON_CODE_CONNECTION_LOST:
mHandler.post(new ToastRunnable("WE ARE OFFLINE CONNECTION_LOST!", 1500));
break;
case MqttException.REASON_CODE_SERVER_CONNECT_ERROR:
Timber.tag(Utils.TIMBER_TAG).v( "c " + e.getMessage());
e.printStackTrace();
break;
case MqttException.REASON_CODE_FAILED_AUTHENTICATION:
Intent i = new Intent("RAISEALLARM");
i.putExtra("ALLARM", e);
Timber.tag(Utils.TIMBER_TAG).v("b " + e.getMessage());
break;
default:
Timber.tag(Utils.TIMBER_TAG).v( "a " + e.getMessage() +" "+ e.toString());
}
}
mHandler.post(new ToastRunnable("WE ARE ONLINE!", 500));
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
Timber.tag(Utils.TIMBER_TAG).v("onStartCommand");
String input = intent.getStringExtra(INTENT_ID);
Timber.tag(Utils.TIMBER_TAG).v("onStartCommand "+ input);
Intent notificationIntent = new Intent(this, MainActivity.class);
PendingIntent pendingIntent = PendingIntent.getActivity(this,
0, notificationIntent, 0);
Notification notification = new NotificationCompat.Builder(this, CHANNEL_ID)
.setContentTitle("Example Service")
.setContentText(input)
.setSmallIcon(R.drawable.ic_android)
.setContentIntent(pendingIntent)
.build();
startForeground(1, notification);
PowerManager powerManager = (PowerManager) getSystemService(POWER_SERVICE);
PowerManager.WakeLock wakeLock = powerManager.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "MyApp::MyWakelockTag");
wakeLock.acquire();
return START_STICKY;
}
}
您正在将 cleansession 设置为 true (options.setCleanSession(true)
);来自 setCleanSession:
If set to true the client and server will not maintain state across restarts of the client, the server or the connection. This means
- Message delivery to the specified QOS cannot be maintained if the client, server or connection are restarted
- The server will treat a subscription as non-durable
我认为 mqtt specs 表述得更清楚:
If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one. This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be reused in any subsequent Session
因此,当您的应用程序断开连接时,会话将被丢弃,新消息将不会排队等待传递。此外,除非您在连接恢复时重新订阅,否则您将不会收到任何其他消息。
但是请注意,如果您将 cleansession 设置为 false,那么在您的客户端离线时收到的任何新消息都将排队等待传送(取决于代理的配置),如果您不希望发生这种情况,客户端可能会长时间离线