Paho MQTT cleanSession 设置为 false 但未收到消息
Paho MQTT cleanSession set to false yet not receiving messages
我正在为一个项目测试 MQTT。当客户端连接时,我还能够接收关于我的客户端订阅的主题的消息。我已将 QoS 设置为 1,并将 cleanSession 设置为 false。但是当我的客户端再次连接时,我无法接收发送到订阅主题的消息。
在我的应用程序中,几乎所有的工作都是由辅助服务完成的。
这是我的代码
AndroidManifest.xml
<?xml version="1.0" encoding="utf-8"?>
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<application
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:theme="@style/AppTheme" >
<activity
android:name=".MainActivity"
android:label="@string/app_name"
android:screenOrientation="portrait" >
<intent-filter>
<action android:name="android.intent.action.MAIN" />
<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>
</activity>
<service
android:name=".MqttHelperService"
android:enabled="true"
android:exported="true" />
<!-- MqttService -->
<service android:name="org.eclipse.paho.android.service.MqttService" />
</application>
MainActivity.java
package com.prateek.mqtttest;
import android.app.Activity;
import android.content.Intent;
import android.os.Bundle;
public class MainActivity extends Activity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
startService(new Intent(getBaseContext(), MqttHelperService.class));
}
}
MqttHelperService.java
package com.prateek.mqtttest;
import android.app.Service;
import android.content.Intent;
import android.os.Binder;
import android.os.IBinder;
import android.widget.Toast;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttHelperService extends Service implements MqttCallback {
private static final String MQTT_URI = "tcp://broker.mqttdashboard.com:1883";
private static final String CLIENT_ID = "prateek";
private static final String MQTT_TOPIC = "mqttmessenger";
private static final int QOS = 1;
private MqttAndroidClient client;
public MqttHelperService() {
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
Toast.makeText(this, "MQTT Helper Service Started", Toast.LENGTH_SHORT).show();
new Thread(new Runnable() {
@Override
public void run() {
connect();
}
}, "MqttHelperService").start();
return START_STICKY;
}
public class MqttHelperBinder extends Binder {
public MqttHelperService getService(){
return MqttHelperService.this;
}
}
public void connect() {
client = new MqttAndroidClient(this, MQTT_URI, CLIENT_ID);
client.setCallback(this);
try {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
client.connect(options, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
Toast.makeText(getBaseContext(), "connected to MQTT broker", Toast.LENGTH_SHORT).show();
subscribe();
}
@Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
Toast.makeText(getBaseContext(), "failed to connect: " + throwable.getMessage(), Toast.LENGTH_SHORT).show();
}
});
} catch (MqttException e) {
Toast.makeText(this, "could not connect to MQTT broker at " + MQTT_URI, Toast.LENGTH_SHORT).show();
}
}
public void subscribe() {
try {
IMqttToken token = client.subscribe(MQTT_TOPIC, QOS, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
Toast.makeText(getBaseContext(), "subscription successful", Toast.LENGTH_SHORT).show();
}
@Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
Toast.makeText(getBaseContext(), "subscription failed: " + throwable, Toast.LENGTH_SHORT).show();
}
});
} catch (MqttException e) {
Toast.makeText(this, "could not subscribe", Toast.LENGTH_SHORT).show();
}
}
@Override
public IBinder onBind(Intent intent) {
// TODO: Return the communication channel to the service.
throw new UnsupportedOperationException("Not yet implemented");
}
@Override
public void connectionLost(Throwable throwable) {
Toast.makeText(this, "connection lost", Toast.LENGTH_SHORT).show();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
Toast.makeText(this, "message received on topic " + s, Toast.LENGTH_SHORT).show();
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
@Override
public void onDestroy() {
super.onDestroy();
Toast.makeText(this, "Service Destroyed", Toast.LENGTH_SHORT).show();
}
}
我什至检查了这个 link With the clear session flag set to FALSE, I am missing the published values 但在我的代码中找不到错误
最有可能的问题是正在发布的消息 QoS=0。订阅和发布都必须是 QoS > 0 才能排队等候持久客户端。
您已确认消息是使用 QoS 1 发布的,因此接下来要做的是使用已知的工作工具进行测试,以消除您的代码可能存在的问题。我试过了:
mosquitto_sub -i prateek -t mqttmessenger -h broker.mqttdashboard.com -v -d -c -q 1
并验证我可以在连接时收到消息:
mosquitto_pub -q 1 -t mqttmessenger -m hello2 -h broker.mqttdashboard.com
然后我断开了 mosquitto_sub 客户端的连接,并使用相同的 mosquitto_pub 命令发布了另一条消息。使用相同的命令重新连接 mosquitto_sub 没有产生任何消息,正如您所看到的那样。重复该过程但使用 test.mosquitto.org
作为代理,行为符合预期。 broker.mqttdashboard.com
似乎未配置为允许持久客户端。
我最近遇到了同样的问题。 现在我认为解决方案很简单,但我花了很多时间才弄明白。
这一行是 'bad':
client.connect(mqttOptions, mqqtActionListener);
'correct'是:
client.connect(mqttOptions, null, mqqtActionListener);
如果您使用 2 个参数调用 connect 方法,则您正在使用此构造函数:
public IMqttToken connect(Object userContext, IMqttActionListener callback) throws MqttException
而不是正确的:
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException
希望这也是你的问题。
我正在为一个项目测试 MQTT。当客户端连接时,我还能够接收关于我的客户端订阅的主题的消息。我已将 QoS 设置为 1,并将 cleanSession 设置为 false。但是当我的客户端再次连接时,我无法接收发送到订阅主题的消息。 在我的应用程序中,几乎所有的工作都是由辅助服务完成的。
这是我的代码
AndroidManifest.xml
<?xml version="1.0" encoding="utf-8"?>
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<application
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:theme="@style/AppTheme" >
<activity
android:name=".MainActivity"
android:label="@string/app_name"
android:screenOrientation="portrait" >
<intent-filter>
<action android:name="android.intent.action.MAIN" />
<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>
</activity>
<service
android:name=".MqttHelperService"
android:enabled="true"
android:exported="true" />
<!-- MqttService -->
<service android:name="org.eclipse.paho.android.service.MqttService" />
</application>
MainActivity.java
package com.prateek.mqtttest;
import android.app.Activity;
import android.content.Intent;
import android.os.Bundle;
public class MainActivity extends Activity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
startService(new Intent(getBaseContext(), MqttHelperService.class));
}
}
MqttHelperService.java
package com.prateek.mqtttest;
import android.app.Service;
import android.content.Intent;
import android.os.Binder;
import android.os.IBinder;
import android.widget.Toast;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttHelperService extends Service implements MqttCallback {
private static final String MQTT_URI = "tcp://broker.mqttdashboard.com:1883";
private static final String CLIENT_ID = "prateek";
private static final String MQTT_TOPIC = "mqttmessenger";
private static final int QOS = 1;
private MqttAndroidClient client;
public MqttHelperService() {
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
Toast.makeText(this, "MQTT Helper Service Started", Toast.LENGTH_SHORT).show();
new Thread(new Runnable() {
@Override
public void run() {
connect();
}
}, "MqttHelperService").start();
return START_STICKY;
}
public class MqttHelperBinder extends Binder {
public MqttHelperService getService(){
return MqttHelperService.this;
}
}
public void connect() {
client = new MqttAndroidClient(this, MQTT_URI, CLIENT_ID);
client.setCallback(this);
try {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
client.connect(options, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
Toast.makeText(getBaseContext(), "connected to MQTT broker", Toast.LENGTH_SHORT).show();
subscribe();
}
@Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
Toast.makeText(getBaseContext(), "failed to connect: " + throwable.getMessage(), Toast.LENGTH_SHORT).show();
}
});
} catch (MqttException e) {
Toast.makeText(this, "could not connect to MQTT broker at " + MQTT_URI, Toast.LENGTH_SHORT).show();
}
}
public void subscribe() {
try {
IMqttToken token = client.subscribe(MQTT_TOPIC, QOS, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
Toast.makeText(getBaseContext(), "subscription successful", Toast.LENGTH_SHORT).show();
}
@Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
Toast.makeText(getBaseContext(), "subscription failed: " + throwable, Toast.LENGTH_SHORT).show();
}
});
} catch (MqttException e) {
Toast.makeText(this, "could not subscribe", Toast.LENGTH_SHORT).show();
}
}
@Override
public IBinder onBind(Intent intent) {
// TODO: Return the communication channel to the service.
throw new UnsupportedOperationException("Not yet implemented");
}
@Override
public void connectionLost(Throwable throwable) {
Toast.makeText(this, "connection lost", Toast.LENGTH_SHORT).show();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
Toast.makeText(this, "message received on topic " + s, Toast.LENGTH_SHORT).show();
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
@Override
public void onDestroy() {
super.onDestroy();
Toast.makeText(this, "Service Destroyed", Toast.LENGTH_SHORT).show();
}
}
我什至检查了这个 link With the clear session flag set to FALSE, I am missing the published values 但在我的代码中找不到错误
最有可能的问题是正在发布的消息 QoS=0。订阅和发布都必须是 QoS > 0 才能排队等候持久客户端。
您已确认消息是使用 QoS 1 发布的,因此接下来要做的是使用已知的工作工具进行测试,以消除您的代码可能存在的问题。我试过了:
mosquitto_sub -i prateek -t mqttmessenger -h broker.mqttdashboard.com -v -d -c -q 1
并验证我可以在连接时收到消息:
mosquitto_pub -q 1 -t mqttmessenger -m hello2 -h broker.mqttdashboard.com
然后我断开了 mosquitto_sub 客户端的连接,并使用相同的 mosquitto_pub 命令发布了另一条消息。使用相同的命令重新连接 mosquitto_sub 没有产生任何消息,正如您所看到的那样。重复该过程但使用 test.mosquitto.org
作为代理,行为符合预期。 broker.mqttdashboard.com
似乎未配置为允许持久客户端。
我最近遇到了同样的问题。 现在我认为解决方案很简单,但我花了很多时间才弄明白。
这一行是 'bad':
client.connect(mqttOptions, mqqtActionListener);
'correct'是:
client.connect(mqttOptions, null, mqqtActionListener);
如果您使用 2 个参数调用 connect 方法,则您正在使用此构造函数:
public IMqttToken connect(Object userContext, IMqttActionListener callback) throws MqttException
而不是正确的:
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException
希望这也是你的问题。