多次调用 onCreate 时 MQTT 创建多个连接

MQTT Creates Multiple connections when onCreate is called more than once

我正在使用 Paho Android MQTT 客户端创建一个即时消息系统。它的实现按预期工作,但是我遇到了这些错误。

我在 MainActivity ClassonCreate 中调用 Connection Class(这也要求创建到代理的连接)。

现在的问题是,假设我在 MainActivity Class 上,然后按返回键从 MainActivity Class 移动到另一个 activity,然后我又回到 MainActivity Class,将创建另一个代理连接。这意味着无论何时发布一条消息,客户端都会收到两次消息。

MainActivity.java:

public class MainActivity 扩展 AppCompatActivity {

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.chat_intera);
    Toolbar toolbar = (Toolbar) findViewById(R.id.toolbar);
    setSupportActionBar(toolbar);

    dbHelper = DatabaseManager.getInstance(context);
    //mRecyclerView = (RecyclerView) findViewById(R.id.history_recycler_view);

    connections = new Connections();
    connections.createConnectionForPublishing(context);

}


@Override
public void onDestroy() {
    super.onDestroy();
    System.out.println("LOG: Service destroyed");
}

}

Connection.java

public class Connection {

    public void createConnectionForPublishing (final Context context) {

        //Instantiate the mqtt android client class
        mqttAndroidClient = new MqttAndroidClient (context.getApplicationContext (), serverUri, clientId);


        mqttAndroidClient.setCallback (new MqttCallbackExtended () {

            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                if (reconnect) {
                    System.out.println ("Reconnected to : " + serverURI);
                } else {
                    System.out.println ("Connected to: " + serverURI);
                }
            }

            @Override
            public void connectionLost (Throwable cause) {
                System.out.println ("The Connection was lost.");
            }

            @Override
            public void messageArrived (String topic, final MqttMessage message) throws Exception {
                System.out.println ("Message received and Arrived");
            }

            @Override
            public void deliveryComplete (IMqttDeliveryToken token) {
                System.out.println("Message Delivered");
            }
        });

        final MqttConnectOptions mqttConnectOptions = new MqttConnectOptions (); 
        mqttConnectOptions.setMqttVersion (MqttConnectOptions.MQTT_VERSION_3_1_1);
        mqttConnectOptions.setAutomaticReconnect (true);
        mqttConnectOptions.setCleanSession (false);

        try {
            mqttAndroidClient.connect (mqttConnectOptions, null, new IMqttActionListener () {

                @Override
                public void onSuccess (IMqttToken asyncActionToken) {
                    System.out.println ("BROKER CONNECTED");

                    DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions ();
                    disconnectedBufferOptions.setBufferEnabled (true);
                    disconnectedBufferOptions.setBufferSize (100);
                    disconnectedBufferOptions.setPersistBuffer (false);
                    disconnectedBufferOptions.setDeleteOldestMessages (false);

                    //mqttAndroidClient.setBufferOpts (disconnectedBufferOptions);

                }

                @Override
                public void onFailure (IMqttToken asyncActionToken, Throwable exception) {
                    System.out.println ("Failed to connect to: " + serverUri);
                }
            });

        } catch (MqttException ex) {
            ex.printStackTrace ();
        }
    }

    // ...
}

我是 MQTT 的新手,如果有人能提供帮助,我会很高兴。提前致谢

我不确定,但我认为 null 检查可能会有所帮助

if (connection == null){
   connections = new Connections();
   connections.createConnectionForPublishing(context);
}

然后,如果您不想多次建立连接,则必须将其设置为单例,只有 1 个连接实例 class 并且建立一次连接。

因此您的代码将变为:

MainActivity.java:

public class MainActivity extends Activity {

    protected void onCreate (Bundle savedInstanceState) {
        super.onCreate (savedInstanceState);
        setContentView (R.layout.chat_intera);

        Connection connection = Connection.getInstance (getApplicationContext ());
    }

    // ....
}

Connection.java

public class Connection {

    private       static Connection          connInst;
    private       static boolean             connected;

    private       static MqttAndroidClient   mqttAndroidClient;
    private       static IMqttActionListener mqttActionListener;
    private final static MqttConnectOptions  mqttConnectOptions = new MqttConnectOptions (); 
    static {
        mqttConnectOptions.setMqttVersion (MqttConnectOptions.MQTT_VERSION_3_1_1);
        mqttConnectOptions.setAutomaticReconnect (true);
        mqttConnectOptions.setCleanSession (false);
    }

    private Connection (Context context) {
        //Instantiate the mqtt android client class
        mqttAndroidClient = new MqttAndroidClient (context.getApplicationContext (), serverUri, clientId);

        mqttAndroidClient.setCallback (new MqttCallbackExtended () {

            @Override
            public void connectComplete (boolean reconnect, String serverURI) {
                connected = true;
                if (reconnect) {
                    System.out.println ("Reconnected to : " + serverURI);
                } else {
                    System.out.println ("Connected to: " + serverURI);
                }
            }

            @Override
            public void connectionLost (Throwable cause) {
                connected = false;
                System.out.println ("The Connection was lost.");
            }

            @Override
            public void messageArrived (String topic, final MqttMessage message) throws Exception {
                System.out.println ("Message received and Arrived");
            }

            @Override
            public void deliveryComplete (IMqttDeliveryToken token) {
                System.out.println("Message Delivered");
            }
        });

        mqttActionListener = new IMqttActionListener () {

            @Override
            public void onSuccess (IMqttToken asyncActionToken) {
                System.out.println ("BROKER CONNECTED");

                DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions ();
                disconnectedBufferOptions.setBufferEnabled (true);
                disconnectedBufferOptions.setBufferSize (100);
                disconnectedBufferOptions.setPersistBuffer (false);
                disconnectedBufferOptions.setDeleteOldestMessages (false);

                //mqttAndroidClient.setBufferOpts (disconnectedBufferOptions);

            }

            @Override
            public void onFailure (IMqttToken asyncActionToken, Throwable exception) {
                System.out.println ("Failed to connect to: " + serverUri);
            }
        });
    }

    public static Connection getInstance (Context context) {
        if (connInst == null) {
            connInst = new Connection (context);
        }
        createConnectionIfNeeded (context);

        return connInst;
    }

    private static void createConnectionIfNeeded () {
        if (connected) {
            return;
        }

        try {
            mqttAndroidClient.connect (mqttConnectOptions, null, mqttActionListener);

        } catch (MqttException ex) {
            ex.printStackTrace ();
        }
    }

    // ...
}

我建议您使用 Android onPause()

设置标志

https://developer.android.com/reference/android/app/Activity.html#onPause()

然后不使用 onCreate(),而是使用 onResume() 启动您的 MQTT 连接,但首先检查 onPause() 标志。

您可能还需要在 onPause()onDestroy() 中关闭连接。查看 Android application/activity 的生命周期了解更多信息。