为什么 MQTT 服务阻塞 activity?
Why MQTT service is blocking activity?
我的应用程序来自 MainActivity onCreate(),启动连接到服务器并订阅主题的服务。此外,当我添加新连接时,我的服务会重新启动(使用 stop/startService)。我将连接数据(ip、端口等)存储在 SQL 数据库中,并在它启动后将其集中在服务中。问题是(我认为)当其中一个连接参数不正确时,服务正在等待超时并阻止 activity...
如果我设置 token.waitForCompletion(500);
它会快得多,但我猜不出那个值...
有什么办法可以解决我的问题吗?
@Override
public void onCreate() {
Datapool();
IntentFilter intentf = new IntentFilter();
intentf.addAction(ConnectivityManager.CONNECTIVITY_ACTION);
registerReceiver(mqttBroadcastReceiver, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
deviceId = String.format(DEVICE_ID_FORMAT, Settings.Secure.getString(getContentResolver(), Settings.Secure.ANDROID_ID));
}
MQTTBroadcastReceiver mqttBroadcastReceiver = new MQTTBroadcastReceiver();
class MQTTBroadcastReceiver extends BroadcastReceiver {
@Override
public void onReceive(Context context, Intent intent) {
Connect();
}
};
IMqttToken token;
int i = 0;
private HashMap<String, Boolean> _hashMap = new HashMap<>();
private void Connect(){
for (ServiceDataModel connectionData : dataModels) {
Log.d(TAG, "doConnect() " + connectionData.getCONNECTION_NAME());
_hashMap.put(connectionData.getCONNECTION_NAME(), false);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
i++;
try {
mqttClient = new MqttAsyncClient("tcp://" + connectionData.getSERVER_IP() + ":" + connectionData.getSERVER_PORT(), deviceId + i , new MemoryPersistence());
token = mqttClient.connect();
token.waitForCompletion(2500);
if (mqttClient.isConnected()) {
mqttClient.setCallback(new MqttEventCallback());
token = mqttClient.subscribe(connectionData.getTOPIC(), Integer.parseInt(connectionData.getQOS()));
token.waitForCompletion(2500);
_hashMap.put(connectionData.getCONNECTION_NAME(), true);
}
}catch (Exception ex){
Log.d(TAG, ex.toString() + connectionData.toString());
}
}
sendMessageToActivity(_hashMap);
}
通过调用 token.waitForCompletion(2500)
,您尝试先同步连接,然后同步订阅 - 这会阻塞主线程。
不要应用这些 hack,而是使用异步连接回调(下面的 mCallback
和 mConnectionCallback
)。连接成功后,使用异步订阅回调(下图mSubscribeCallback
):
private final MqttCallbackExtended mCallback = new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String brokerAddress) {
mqttClient.subscribe(connectionData.getTOPIC(), Integer.parseInt(connectionData.getQOS()), null, mSubscribeCallback);
}
@Override
public void connectionLost(Throwable ex) {
}
@Override
public void deliveryComplete(IMqttDeliveryToken deliveryToken) {
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
}
};
private final IMqttActionListener mConnectionCallback = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// do nothing, this case is handled in mCallback.connectComplete()
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
};
private final IMqttActionListener mSubscribeCallback = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken subscribeToken) {
_hashMap.put(connectionData.getCONNECTION_NAME(), true);
}
@Override
public void onFailure(IMqttToken subscribeToken, Throwable ex) {
}
};
try {
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setCleanSession(true);
connectOptions.setAutomaticReconnect(false);
connectOptions.setUserName("username");
connectOptions.setPassword("password".toCharArray());
mqttClient = new MqttAsyncClient("tcp://" + connectionData.getSERVER_IP() + ":" + connectionData.getSERVER_PORT(), deviceId + i , new MemoryPersistence());
mqttClient.setCallback(mCallback);
mqttClient.connect(connectOptions, null, mConnectionCallback);
} catch (Exception ex) {
Log.d(TAG, ex.toString() + connectionData.toString());
}
所以稍微重新安排一下您的应用程序,它就不会阻塞。
还考虑使用 LocalBroadcastManager 在 SQLite、MQTT 和 Activity 之间进行通信。
更新: 使用 LiveData 和 Room 而不是 LocalBroadcastManager
我的应用程序来自 MainActivity onCreate(),启动连接到服务器并订阅主题的服务。此外,当我添加新连接时,我的服务会重新启动(使用 stop/startService)。我将连接数据(ip、端口等)存储在 SQL 数据库中,并在它启动后将其集中在服务中。问题是(我认为)当其中一个连接参数不正确时,服务正在等待超时并阻止 activity...
如果我设置 token.waitForCompletion(500);
它会快得多,但我猜不出那个值...
有什么办法可以解决我的问题吗?
@Override
public void onCreate() {
Datapool();
IntentFilter intentf = new IntentFilter();
intentf.addAction(ConnectivityManager.CONNECTIVITY_ACTION);
registerReceiver(mqttBroadcastReceiver, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
deviceId = String.format(DEVICE_ID_FORMAT, Settings.Secure.getString(getContentResolver(), Settings.Secure.ANDROID_ID));
}
MQTTBroadcastReceiver mqttBroadcastReceiver = new MQTTBroadcastReceiver();
class MQTTBroadcastReceiver extends BroadcastReceiver {
@Override
public void onReceive(Context context, Intent intent) {
Connect();
}
};
IMqttToken token;
int i = 0;
private HashMap<String, Boolean> _hashMap = new HashMap<>();
private void Connect(){
for (ServiceDataModel connectionData : dataModels) {
Log.d(TAG, "doConnect() " + connectionData.getCONNECTION_NAME());
_hashMap.put(connectionData.getCONNECTION_NAME(), false);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
i++;
try {
mqttClient = new MqttAsyncClient("tcp://" + connectionData.getSERVER_IP() + ":" + connectionData.getSERVER_PORT(), deviceId + i , new MemoryPersistence());
token = mqttClient.connect();
token.waitForCompletion(2500);
if (mqttClient.isConnected()) {
mqttClient.setCallback(new MqttEventCallback());
token = mqttClient.subscribe(connectionData.getTOPIC(), Integer.parseInt(connectionData.getQOS()));
token.waitForCompletion(2500);
_hashMap.put(connectionData.getCONNECTION_NAME(), true);
}
}catch (Exception ex){
Log.d(TAG, ex.toString() + connectionData.toString());
}
}
sendMessageToActivity(_hashMap);
}
通过调用 token.waitForCompletion(2500)
,您尝试先同步连接,然后同步订阅 - 这会阻塞主线程。
不要应用这些 hack,而是使用异步连接回调(下面的 mCallback
和 mConnectionCallback
)。连接成功后,使用异步订阅回调(下图mSubscribeCallback
):
private final MqttCallbackExtended mCallback = new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String brokerAddress) {
mqttClient.subscribe(connectionData.getTOPIC(), Integer.parseInt(connectionData.getQOS()), null, mSubscribeCallback);
}
@Override
public void connectionLost(Throwable ex) {
}
@Override
public void deliveryComplete(IMqttDeliveryToken deliveryToken) {
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
}
};
private final IMqttActionListener mConnectionCallback = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// do nothing, this case is handled in mCallback.connectComplete()
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
};
private final IMqttActionListener mSubscribeCallback = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken subscribeToken) {
_hashMap.put(connectionData.getCONNECTION_NAME(), true);
}
@Override
public void onFailure(IMqttToken subscribeToken, Throwable ex) {
}
};
try {
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setCleanSession(true);
connectOptions.setAutomaticReconnect(false);
connectOptions.setUserName("username");
connectOptions.setPassword("password".toCharArray());
mqttClient = new MqttAsyncClient("tcp://" + connectionData.getSERVER_IP() + ":" + connectionData.getSERVER_PORT(), deviceId + i , new MemoryPersistence());
mqttClient.setCallback(mCallback);
mqttClient.connect(connectOptions, null, mConnectionCallback);
} catch (Exception ex) {
Log.d(TAG, ex.toString() + connectionData.toString());
}
所以稍微重新安排一下您的应用程序,它就不会阻塞。
还考虑使用 LocalBroadcastManager 在 SQLite、MQTT 和 Activity 之间进行通信。
更新: 使用 LiveData 和 Room 而不是 LocalBroadcastManager