Paho Android 客户端在显示消息时断开连接

Paho Android client drops connection when a message is shown

我正在尝试使用基本的 Eclipse Paho MQTT 客户端版本 1.1.0 连接到 CloudAMQP RabbitMQ 实例、订阅主题并接收消息(我从 Web 管理控制台发送)。

如果应用程序将所有消息有效负载发送到日志输出,则效果很好。

如果应用程序将消息添加到 TextView,消息会出现,但连接会立即断开并且不会收到更多消息。

完整项目可在 GitHub 获得。下面是一个简单的例子。

有一个基于服务的 MQTT Paho 客户端,但我认为对于非常简单的应用程序,基本客户端应该能够在 Android 应用 UI 中接收和显示消息。

...

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MainActivity extends AppCompatActivity implements MqttCallback {

    private static final String TAG = "main";
    private Connection connection;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Toolbar toolbar = (Toolbar) findViewById(R.id.toolbar);
        setSupportActionBar(toolbar);

        configureUI();
    }

    private Button buttonConnect;
    private TextView messageWindow;


    private void configureUI() {
        buttonConnect = (Button) findViewById(R.id.buttonConnect);
        messageWindow = (TextView) findViewById(R.id.messageWindow);

        buttonConnect.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                String s = "***";
                String d = "test";
                String u = "***";
                String p = "***";

                if (connection != null && connection.isConnected()) {
                    connection.disconnect();
                    connection = null;
                    messageWindow.setText(String.format("Disconnected from server %s",
                            new Object[]{s}));
                    return;
                }

                messageWindow.setText(String.format("Connecting to server %s as user %s",
                        new Object[]{s, u}));

                connection = new Connection(MainActivity.this, MainActivity.this, s, u, p);
                connection.connect();

                if (connection.isConnected()) {
                    messageWindow.append("\n\n");
                    messageWindow.append(String.format("Connected, listening for messages from topic %s",
                            new Object[]{d}));
                    connection.subscribe(d);
                }
            }
        });
    }

    @Override
    public void connectionLost(Throwable cause) {
        Log.e(TAG, "connectionLost" + cause.getMessage());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String msg = new String(message.getPayload());
        Log.i(TAG, "Message Arrived: " + msg);
        // messageWindow.append(msg);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        Log.i(TAG, "Delivery Complete!");
    }

    class Connection {
        private static final String TAG = "conn";
        private static final String protocol = "tcp://";
        private static final int port = 1883;
        private static final int version = MqttConnectOptions.MQTT_VERSION_3_1_1;
        private static final int keepAliveSeconds = 20 * 60;

        private final Context context;
        private MqttClient client;

        private final String server;
        private final String user;
        private final String pass;

        private final MqttConnectOptions options = new MqttConnectOptions();

        public Connection(Context ctx, MqttCallback mqttCallback, String server, String user, String pass) {
            this.context = ctx;
            this.server = server;
            this.user = user;
            this.pass = pass;

            MqttClientPersistence memPer = new MemoryPersistence();
            try {
                String url = protocol + server + ":" + port;
                client = new MqttClient(url, MqttClient.generateClientId(), memPer);
                client.setCallback(mqttCallback);
            } catch (MqttException e) {
                e.printStackTrace();
            }

            options.setUserName(user + ":" + user);
            options.setPassword(pass.toCharArray());
            options.setMqttVersion(version);
            options.setKeepAliveInterval(keepAliveSeconds);
        }

        void connect() {
            Log.i(TAG, "buttonConnect");
            try {
                client.connect(options);
            } catch (MqttException ex) {
                Log.e(TAG, "Connection attempt failed with reason code = " + ex.getReasonCode() + ":" + ex.getCause());
            }
        }

        public boolean isConnected() {
            return client.isConnected();
        }

        public void disconnect() {
            try {
                client.disconnect();
            } catch (MqttException e) {
                Log.e(TAG, "Disconnect failed with reason code = " + e.getReasonCode());
            }
        }

        void subscribe(String dest) {
            try {
                client.subscribe(dest);
            } catch (MqttException e) {
                Log.e(TAG, "Subscribe failed with reason code = " + e.getReasonCode());
            }
        }
    }
}

我猜这是因为您正在尝试从 none UI 线程更新 TextView。

尝试将 messageWindow.append(msg); 包装在 runOnUiThread 调用中。

public void messageArrived(String topic, MqttMessage message) throws Exception {
    String msg = new String(message.getPayload());
    Log.i(TAG, "Message Arrived: " + msg);
    runOnUiThread(new Runnable(){
       public void run() {
           messageWindow.append(msg);
       }
    });
}