如何在 android 中实现必须在多个活动中访问的 paho MQTT 客户端

How to implement paho MQTT client in android that has to be accessed in multiple activities

我有一个具有多个活动并使用 MQTT 的应用程序。

我在 gradle 依赖项中使用 paho 客户端如下

compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.0.3-SNAPSHOT'
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.3-SNAPSHOT'

我将使用用户名和密码连接到代理,某些活动将使用不同的用户名和密码。

目前我正在处理每个 activity 中的 connectsubscribepublish 任务,就像我将在下面包含的代码中一样。随着活动越来越多,我遇到了问题。请建议我如何将代码移植到服务或单例中,以便它可以重用并变得高效。

这是其中一项活动

package net.kindows.chitchat;

import android.app.ProgressDialog;
import android.content.Intent;
import android.content.pm.ActivityInfo;
import android.os.Bundle;
import android.os.Handler;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.view.WindowManager;
import android.widget.Button;
import android.widget.EditText;
import android.widget.TextView;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.pixplicity.easyprefs.library.Prefs;

import net.kindows.SplashScreen;
import net.kindows.common.ApplicationLoader;
import net.kindows.common.utils;
import net.kindows.intlPhone.IntlPhoneInput;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import butterknife.Bind;
import butterknife.ButterKnife;
import de.keyboardsurfer.android.widget.crouton.Crouton;

import static net.kindows.common.ApplicationLoader._toast;

public class LoginActivity extends AppCompatActivity implements MqttCallback {

    private static final String TAG = "LoginActivity";
    private static final int REQUEST_SIGNUP = 0;
    private static final int REQUEST_PAS_RESET = 1;

    private static final Integer LOGGED_OUT = 0;
    private static final Integer LOGGING_IN = 1;
    private static final Integer WAITING_FOR_SING_IN_ACK = 2;
    private static final Integer LOGGED_IN = 3;
    private static final Integer VERIFICATION_FAILED = 4;

    @Bind(R.id.input_password)
    EditText _passwordText;
    @Bind(R.id.btn_login)
    Button _loginButton;
    @Bind(R.id.link_signup)
    TextView _signupLink;

    @Bind(R.id.my_phone_input)
    IntlPhoneInput _phoneInputView;
    String sUserName = null;
    String sPassword = null;
    String sDestination = null;
    String sMessage = null;

    private Integer state;
    private Handler han = new Handler();
    private MqttConnectOptions connOpt;
    private ProgressDialog _progressDialog;
    /*
        MQTT mqtt = null;

        FutureConnection connection = null;*/
    private boolean isMinimized = false;
    private String clientId;
    private Handler loginAgain = new Handler();
    private Handler timeout;
    private MqttAndroidClient client;

    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_login);
        ButterKnife.bind(this);
        getWindow().setSoftInputMode(WindowManager.LayoutParams.SOFT_INPUT_ADJUST_PAN);
        setRequestedOrientation(ActivityInfo.SCREEN_ORIENTATION_PORTRAIT);
        //connect();
        _loginButton.setEnabled(false);

        // _phoneInputView.setNumber(ApplicationLoader.getSim1number(LoginActivity.this));
        _phoneInputView.setOnValidityChange(new IntlPhoneInput.IntlPhoneInputListener() {
            @Override
            public void done(View view, boolean isValid) {
                if (isValid) {
                    _loginButton.setEnabled(true);
                } else _loginButton.setEnabled(false);
            }
        });
        _loginButton.setOnClickListener(new View.OnClickListener() {

            @Override
            public void onClick(View v) {
                if (!ApplicationLoader.isConnected(LoginActivity.this, true)) {
                } else login();
            }
        });

        _signupLink.setOnClickListener(new View.OnClickListener() {

            @Override
            public void onClick(View v) {
                // Start the Signup activity
                Prefs.putInt(getString(R.string.key_reset_pass), 2);
                Intent intent = new Intent(getApplicationContext(), SignUpActivity.class);
                startActivityForResult(intent, REQUEST_SIGNUP);
            }
        });
        state = LOGGED_OUT;
        connOpt = new MqttConnectOptions();
        connOpt.setCleanSession(true);
        connOpt.setKeepAliveInterval(30);
        connOpt.setCleanSession(true);

        clientId = ApplicationLoader.getClientId(LoginActivity.this);
        client = new MqttAndroidClient(this, "tcp://104.131.50.64:1883", clientId, MqttAndroidClient.Ack.AUTO_ACK);//this,"tcp://104.131.50.64:1883", "app1", null);

    }

    @Override
    protected void onStop() {
        super.onStop();
        isMinimized = true;
        // super.onDestroy();
        try {

            client.close();
        } catch (Exception e) {
            // client.unregisterResources();
            e.printStackTrace();
        }

        Crouton.cancelAllCroutons();
        loginAgain.removeCallbacks(null);
        han.removeCallbacks(null);

    }

    @Override
    protected void onStart() {
        super.onStart();
        // Do not go to splash screen if came from signup activity
        if (isMinimized && Prefs.getBoolean(getString(R.string.show_splash), true)) {
            isMinimized = false;
            han.removeCallbacks(null);
            startActivity(new Intent(this, SplashScreen.class));
            finish();
        }
        Prefs.putBoolean(getString(R.string.show_splash), true);
        if (utils.getLoginState_login()) {
            han.removeCallbacks(null);
            utils._startActivity(this, MainActivity.class);
            finish();
        }
    }

    @Override
    protected void onResume() {
        super.onResume();

        if (utils.getLoginState_login()) {
            han.removeCallbacks(null);
            utils._startActivity(this, MainActivity.class);
            finish();

        }

        ApplicationLoader.isConnected(this, true);

    }

    public void login() {
        Log.d(TAG, getString(R.string.login));
        _toast(getString(R.string.logging_in), LoginActivity.this);

        try {
            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

        try {
            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

        // Disable login button for 5 secs
        final boolean lastLoginState = _loginButton.isEnabled();
        _loginButton.setEnabled(false);
        loginAgain.postDelayed(new Runnable() {
            @Override
            public void run() {
                _loginButton.setEnabled(lastLoginState);
            }
        }, 5000);

        _progressDialog = new ProgressDialog(LoginActivity.this,
                R.style.AppTheme_Dark_Dialog);
        // String phone = _phoneText.getText().toString();
        String password = _passwordText.getText().toString();
        String numb = _phoneInputView.getNumber().replace("+", "");
        connectMQTT(numb, password);
        _progressDialog.setIndeterminate(true);
        _progressDialog.setMessage("Authenticating...");
        _progressDialog.show();

        han.postDelayed(
                new Runnable() {
                    public void run() {  // On complete call either onLoginSuccess or onLoginFailed
                        //onLoginSuccess();
                        onLoginFailed();
                        _progressDialog.dismiss();
                    }
                }, ApplicationLoader.timeOUT);
    }

    private void publish2MQQT(MqttAndroidClient client1, String topic, String msg) throws MqttException {
        if (client1 != null) {

            MqttMessage msg2 = new MqttMessage();
            msg2.setPayload(msg.getBytes());
            client1.publish(topic, msg2, this, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    // _sendErrorLog("on sucess of publish");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    // _sendErrorLog("on fail of publish e= " + exception.getMessage());
                    _progressDialog.dismiss();
                    _toast((exception != null ? exception.getMessage() : ""), LoginActivity.this);
                }
            });

            // Log.e("mqtt ", "published " + msg);
        }

    }

    private void _sendErrorLog(String s) {
        Log.e("LOG", s);
    }

    private void connectMQTT(final String user, final String pass) {
        Log.e("connectMQTT", "1");

        try {
            connOpt.setUserName(user);
            connOpt.setPassword(pass.toCharArray());
            _sendErrorLog("on connteing with " + user + "  " + pass);
            client.connect(connOpt, this, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    _sendErrorLog("on success of connect");
                    try {
                        client.subscribe("astr/app/iremote/" + user.replace("+", ""), 0, this, new IMqttActionListener() {
                            @Override
                            public void onSuccess(IMqttToken asyncActionToken) {
                                _sendErrorLog("on sucess of subscribe");

                                // TODO: Implement your own authentication logic here.
                                JsonObject msg = new JsonObject();
                                msg.addProperty("u", user);
                                msg.addProperty("P", pass);

                                sUserName = user;
                                sPassword = pass;
                                sDestination = "astr/admin/signin";
                                sMessage = msg.toString();
                                state = LOGGING_IN;
                                try {
                                    Log.e("register", "publishing signin message");
                                    if (client == null) {
                                        Log.e("register", "publishing register message client is null");

                                    }
                                    publish2MQQT(client, sDestination, sMessage);
                                    state = WAITING_FOR_SING_IN_ACK;
                                } catch (MqttException e) {
                                    e.printStackTrace();
                                    Log.e("register", "got exception in publish " + e.toString());
                                    _progressDialog.dismiss();
                                    _toast(e.getMessage(), LoginActivity.this);

                                }
                            }

                            @Override
                            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                                _sendErrorLog("on failure of subscribe  " + exception.getMessage());
                                _progressDialog.dismiss();
                                _toast((exception != null ? exception.getMessage() : ""), LoginActivity.this);

                            }
                        });
                        // client.subscribe("astr/app/iremote/" + _num_2b_verified.replace("+", ""));
                    } catch (MqttException | NullPointerException e) {
                        e.printStackTrace();
                    }

                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    _sendErrorLog("on failure of connect" + exception.getMessage());
                    han.removeCallbacks(null);
                    try {
                        _progressDialog.dismiss();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    _toast((exception != null ? exception.getMessage() : ""), LoginActivity.this);

                }
            });
            client.setCallback(this);

        } catch (MqttException e) {
            e.printStackTrace();
            Log.e("connectMQTT", "got exception  ::  " + e.toString());
        }

    }

    @Override
    public void connectionLost(Throwable throwable) {
        Log.e("connection", "lost");
        //connectMQTT();

    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        try {
            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

        String msgRecived = new String(mqttMessage.getPayload());
        /* Log.e("message arrived", "-------------------------------------------------");
        Log.e("message arrived", "| Topic:" + s);*/
        Log.e("message arrived", "| Message: " + msgRecived);
        /*Log.e("message arrived" , "-------------------------------------------------");*/

        if (state.equals(WAITING_FOR_SING_IN_ACK)) {

            han.removeCallbacks(null);
            JsonParser jp = new JsonParser();
            JsonObject reply = (JsonObject) jp.parse(msgRecived);
            if (reply.get("s").getAsInt() == 200) {
                _toast(getString(R.string.logged_in), LoginActivity.this);
                _progressDialog.dismiss();
                _phoneInputView.setVisibility(View.GONE);

                _passwordText.setVisibility(View.VISIBLE);
                _loginButton.setText(R.string.logged_in);
                _loginButton.setEnabled(true);
                state = LOGGED_IN;
                utils.storeLoginState(true, sUserName, sPassword);
                try {
                    client.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }

                state = LOGGED_IN;
                han.removeCallbacks(null);
                try {
                    client.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                startActivity(new Intent(this, MainActivity.class));
                //finish();

            } else {
                state = VERIFICATION_FAILED;
                utils.storeLoginState(false, "", "");
                onLoginFailed();
            }
        }

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }


    @Override
    protected void onActivityResult(int requestCode, int resultCode, Intent data) {
        if (requestCode == REQUEST_SIGNUP) {
            if (resultCode == RESULT_OK) {

                // TODO: Implement successful signup logic here
                // By default we just finish the Activity and log them in automatically
                this.finish();
            }
        }
    }

    @Override
    public void onBackPressed() {
        // Disable going back to the MainActivity
        moveTaskToBack(true);
    }

    public void onLoginFailed() {
        // ApplicationLoader._toast("Login failed",LoginActivity.this);
        _toast(getString(R.string.log_in_failed), LoginActivity.this);
        _passwordText.setVisibility(View.VISIBLE);
        _phoneInputView.setVisibility(View.VISIBLE);
        _loginButton.setEnabled(false);

        // Enable Login after 5000 ms with editing the number
        loginAgain.postDelayed(new Runnable() {
            @Override
            public void run() {
                _loginButton.setEnabled(_phoneInputView.isValid());
            }
        }, 5000);
    }

    @Override
    public void onPause() {
        super.onPause();

        //disconnect();
    }

    public void resetPass(View view) {
        // Start the Signup activity
        Prefs.putInt(getString(R.string.key_reset_pass), 1);
        Intent intent = new Intent(getApplicationContext(), SignUpActivity.class);
        startActivityForResult(intent, REQUEST_PAS_RESET);
    }

}

我认为对您来说最好的选择是在 Fragment 中实现这些相同的功能。我不会通读你的整个源代码,因为它很大而且我没有时间阅读,但我会给你一些想法和指导,你可以自己迁移它。

第 1 步:创建 MQTTFragment:

public class MQTTFragment extends Fragment implements MqttCallback {

   public static final String TAG = "MQTTFragment.tag";

   @Override
   public void onCreate (Bundle savedInstanceState) {
      super.onCreate(savedInstanceState);
      // avoid this frag getting destroyed during rotation
      setRetainInstance(true);
   }

   // DO NOT @Override onCreateView, this fragment have no views

   // Put here ALL the code related to MQTT, 
   // any functionality you want to be able to call from Activity, make public, all the rest is private
   // this fragment should also remember the current state of the connection
  // this fragment can also have interface and listener pattern in case to past result back to activity. 

   ... your mqtt code

}

第 2 步:每个 activity 您想要使用 MQTT 功能都包含 MQTTFragment

public class MyActivity extends AppCompatActivity {

   private MQTTFragment mqttFragment;

   // during onCreate you get or create the fragment
   @Override public void onCreate (Bundle savedInstanceState) {
      super.onCreate(savedInstanceState);
      if(savedInstanceState == null) {
         mqttFragment = new MQTTFragment();
         getSupportFragmentManager()
              .beginTransaction()
              .add(mqttFragment, MQTTFragment.TAG)
              .commit();
      } else {
         mqttFragment = 
              (MQTTFragment) getSupportFragmentManager()
              .findFragmentByTag(MQTTFragment.TAG);
      }
   }

   // now on this activity you can call anything MQTT related functionality from the Fragment

}