MQTT 上的 AWS IoT Android 应用程序抛出 MqttException (0) - java.io.IOException:已连接

AWS IoT Android application over MQTT throws MqttException (0) - java.io.IOException: Already connected

我正在尝试在我的 Android 应用程序中使用 'Authenticate using Cognito-Identity with Cognito user pool'。我的 Cognito 用户池身份验证运行良好,当我单独 运行 并且我也看到了 JWTToken 时。当我 运行 具有未验证角色的 'PubSub' 示例应用程序时,它按预期工作。当我在一个应用程序中集成这两个功能时,应用程序抛出以下错误。

W/System.err: MqttException (0) - java.io.IOException: Already connected
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:38)
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:664)
W/System.err:     at java.lang.Thread.run(Thread.java:761)
W/System.err: Caused by: java.io.IOException: Already connected
W/System.err:     at java.io.PipedOutputStream.connect(PipedOutputStream.java:100)
W/System.err:     at java.io.PipedInputStream.connect(PipedInputStream.java:195)
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketReceiver.<init>(WebSocketReceiver.java:42)
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketSecureNetworkModule.start(WebSocketSecureNetworkModule.java:78)
W/System.err:     at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:650)
W/System.err:   ... 1 more

自上周四以来,我一直在尝试解决此问题,但仍然停留在同一个地方。真的不知道我应该在哪里检查。!

我正在添加我的身份验证(Cognito 用户池身份验证)activity 和连接 activity。

AmazonCognitoIdentityProviderClient identityProviderClient = new 
AmazonCognitoIdentityProviderClient(new AnonymousAWSCredentials(), new ClientConfiguration());
identityProviderClient.setRegion(Region.getRegion(Regions.US_WEST_2));
CognitoUserPool userPool = new CognitoUserPool(getApplicationContext(), "us-west-2_ghtcc6ho9", "4t0mk45hNso69dp2j4jvel5ghm", "1jmq0lhhq721oif9k6nug31c29i760vihua8hvrgu5umfr2a1vd7", identityProviderClient);
cogUser = userPool.getUser();
authenticationHandler = new AuthenticationHandler() {



        @Override
        public void onSuccess(CognitoUserSession userSession, CognitoDevice newDevice) {
            String ids = userSession.getIdToken().getJWTToken();
            Log.d("MyToken","session id___"+userSession.getIdToken().getExpiration()+"___"+userSession.getIdToken().getIssuedAt());
            Intent pubSub = new Intent(MainActivity.this, PubSubActivity.class);
            pubSub.putExtra("token",""+ids);
            startActivity(pubSub);
            //MainActivity.this.finish();

        }

        @Override
        public void getAuthenticationDetails(AuthenticationContinuation authenticationContinuation, String userId) {
            Log.d("MyToken","getAuthenticationDetails");
            AuthenticationDetails authenticationDetails = new AuthenticationDetails("shone", "172737", null);
            authenticationContinuation.setAuthenticationDetails(authenticationDetails);
            // Allow the sign-in to continue
            authenticationContinuation.continueTask();
        }

        @Override
        public void getMFACode(MultiFactorAuthenticationContinuation multiFactorAuthenticationContinuation) {
            Log.d("MyToken","getMFACode");
            multiFactorAuthenticationContinuation.continueTask();
        }

        @Override
        public void authenticationChallenge(ChallengeContinuation continuation) {
            Log.d("MyToken","authenticationChallenge"+continuation.getChallengeName());
            newPasswordContinuation.continueTask();
        }

        @Override
        public void onFailure(Exception exception) {
            exception.printStackTrace();
            Log.d("MyToken","onFailure");
        }
    };
     cogUser.getSessionInBackground(authenticationHandler); 

当它到达 'OnSuccess' 时,我将启动我的连接 activity 并将我的会话令牌与 Intent 一起传递。移动到下一个 activity

private static final String COGNITO_POOL_ID = "us-west-2:a153a090-508c-44c0-a9dd-efd450298c4b";
private static final Regions MY_REGION = Regions.US_WEST_2;
AWSIotMqttManager mqttManager;
String clientId;
AWSCredentials awsCredentials;
CognitoCachingCredentialsProvider credentialsProvider;
 @Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    Intent intent = getIntent();
if(null == intent){
    Toast.makeText(getApplicationContext(), "Token is null", Toast.LENGTH_SHORT).show();
}else {
    token = intent.getStringExtra("token");
}
 clientId = UUID.randomUUID().toString();
    credentialsProvider = new CognitoCachingCredentialsProvider(
            getApplicationContext(), 
            COGNITO_POOL_ID, 
            MY_REGION 
    );
mqttManager = new AWSIotMqttManager(clientId, CUSTOMER_SPECIFIC_ENDPOINT);
 Map loginsMap = new HashMap();
    loginsMap.put("cognito-idp.us-west-2.amazonaws.com/us-west-2_ghtcc6ho9", token);
    credentialsProvider.setLogins(loginsMap);
    Log.d("SESSION_ID", ""+token);
    new Thread(new Runnable() {
        @Override
        public void run() {
            credentialsProvider.refresh();
            awsCredentials = credentialsProvider.getCredentials();
            Log.d("SESSION_ID B: ", ""+awsCredentials.getAWSAccessKeyId());
            Log.d("SESSION_ID C: ", ""+awsCredentials.getAWSSecretKey());
        }
    }).start();
}

 View.OnClickListener connectClick = new View.OnClickListener() {
    @Override
    public void onClick(View v) {

        Log.d(LOG_TAG, "clientId = " + clientId);

        try {
            mqttManager.connect(credentialsProvider, new AWSIotMqttClientStatusCallback() {
                @Override
                public void onStatusChanged(final AWSIotMqttClientStatus status,
                        final Throwable throwable) {
                    Log.d(LOG_TAG, "Status = " + String.valueOf(status)+"______"+((null !=throwable)?throwable.getMessage():""));

                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            if (status == AWSIotMqttClientStatus.Connecting) {
                                tvStatus.setText("Connecting...");

                            } else if (status == AWSIotMqttClientStatus.Connected) {
                                tvStatus.setText("Connected");

                            } else if (status == AWSIotMqttClientStatus.Reconnecting) {
                                if (throwable != null) {
                                    Log.e(LOG_TAG, "Connection error.", throwable);
                                }
                                tvStatus.setText("Reconnecting");
                            } else if (status == AWSIotMqttClientStatus.ConnectionLost) {
                                if (throwable != null) {
                                    Log.e(LOG_TAG, "Connection error.", throwable);
                                    throwable.printStackTrace();
                                }
                                tvStatus.setText("Disconnected");
                            } else {
                                tvStatus.setText("Disconnected");

                            }
                        }
                    });
                }
            });
        } catch (final Exception e) {
            Log.e(LOG_TAG, "Connection error.", e);
        }
    }
};

我的代码有什么问题?为什么在调用 MQTT 连接时会抛出异常?任何帮助,将不胜感激。

当客户端连接到代理时,它有一个唯一的客户端 ID。如果客户端尝试使用相同的客户端 ID 进行连接,则会发生此错误。使用不同的客户端 ID,如 foo1、foo2、foo3 等

我为此苦恼了将近一个星期。

完整的操作过程 -> 登录成功后你会得到一个jwt token

String idToken = cognitoUserSession.getIdToken().getJWTToken();

放入地图

Map<String, String> logins = new HashMap<String, String>(); 
//fill it with Cognito User token
logins.put("cognito-idp.<REGION>.amazonaws.com/<COGNITO_USER_POOL_ID>", idToken);

然后用它在两个地方设置(在任何文档中都没有说明!)

CognitoCachingCredentialsProvider credentialsProvider = new 
CognitoCachingCredentialsProvider(context, IDENTITY_POOL_ID, REGION);
credentialsProvider.setLogins(logins);

AmazonCognitoIdentity cognitoIdentity = new AmazonCognitoIdentityClient(credentialsProvider);
GetIdRequest getIdReq = new GetIdRequest();
getIdReq.setLogins(logins); //or if you have already set provider logins just use credentialsProvider.getLogins()
getIdReq.setIdentityPoolId(COGNITO_POOL_ID);
GetIdResult getIdRes = cognitoIdentity.getId(getIdReq);

之后你还需要打个电话

AttachPrincipalPolicyRequest attachPolicyReq = new AttachPrincipalPolicyRequest(); //in docs it called AttachPolicyRequest but it`s wrong
attachPolicyReq.setPolicyName("allAllowed"); //name of your IOTAWS policy
attachPolicyReq.setPrincipal(getIdRes.getIdentityId());
new AWSIotClient(credentialsProvider).attachPrincipalPolicy(attachPolicyReq);

只有在那之后你才能启用连接按钮并继续这样

mqttManager.connect(credentialsProvider, new AWSIotMqttClientStatusCallback() {

真的为了这一小段代码我花了很多时间...

我也遇到了同样的错误 -

Feb 27, 2019 10:23:09 AM com.amazonaws.services.iot.client.mqtt.AwsIotMqttConnectionListener onFailure
WARNING: Connect request failure
MqttException (0) - java.io.IOException: Already connected
    at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:38)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:664)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Already connected
    at java.io.PipedOutputStream.connect(PipedOutputStream.java:100)

但问题不同。

首先,您不需要从代码中调用 attachPrincipalPolicy。您也可以使用命令行。你可以做类似 -

aws iot attach-principal-policy --principal us-east-1:1c973d17-98e6-4df6-86bf-d5cedc1fbc0d --policy-name "thingpolicy" --region us-east-1 --profile osfg

您将从身份池的身份浏览器中获取主体 ID。现在让我们来看看错误 -

要使用经过身份验证的 Cognito 凭据成功连接到 mqtt,您需要 2 个正确的策略 -

  1. 与您的身份池对应的身份验证角色应允许所有 mqtt 操作。
  2. AWS IoT 政策应允许相同的操作,您需要将您的 Cognito 身份与此政策相关联。我们使用 attachPrincipalPolicy 这样做。

如果遗漏了任何一步,我们就会出现上述错误。我同意该错误具有误导性 - Already connected 对我来说毫无意义。我通常认为它与 clientId 有关,它应该是唯一的。但无论如何,希望 AWS 的人在某个时候能做得更好。

对于我的特殊情况,问题是第 1 点。虽然我的 IoT 策略具有所有必需的权限,但与身份池对应的身份验证角色却没有。所以一定要这样做。

我也创建了一个 youtube 视频来展示这个:https://www.youtube.com/watch?v=j2KJVHGHaFc