客户端如何知道它是否已经订阅了 MQTT 主题?

How can a client know if it is already subscribed to a MQTT topic?

我正在订阅一个 MQTT 主题(在我的例子中它是应用程序唯一用户 ID)。我正在使用 AWS IOT 核心服务 subscription.Whenever 主屏幕打开并且我从 awsConnectClient 获得了连接回调, 我打电话订阅。现在如果应用程序打开三次会发生什么它订阅了相同的主题 3 time.Now 每当任何消息发布到应用程序收到的 topic.It 3 次。

现在我想知道的是,如果此用户 ID 已从该设备订阅,我将不会从同一设备再次调用订阅。

一种方法可能是如果我在我的应用程序中保存我已经订阅了这个主题并且不再调用订阅。但我怀疑这种方法是否对所有 scenarios.Could 都是正确的,如果有任何 aws iot api 可以告诉我这已经被订阅,我们可能只能从服务器端驱动这个逻辑。

fun connectClick() {
    Log.d(TAG, "clientId = $clientId")

    try {
        mqttManager.connect(clientKeyStore) { status, throwable ->
            Log.d(TAG, "Status = " + status.toString())
            var formattedStatus = String.format(getString(R.string.status_msg),status.toString())

            if (status == AWSIotMqttClientStatusCallback.AWSIotMqttClientStatus.Connected) {
                Log.i(TAG, " subscribed to - " + VoiceXPreference(this).rosterName)
                unsubscribe()
                subscribeClick(VoiceXPreference(this).rosterName)
            }
            runOnUiThread {
                tv_iot_status.text = formattedStatus
                if (throwable != null) {
                    Log.e(TAG, "Connection error.", throwable)
                }
            }
        }
    } catch (e: Exception) {
        Log.e(TAG, "Connection error.", e)
    }

}

以上是我的订阅 code.Although 我总是在订阅前取消订阅,但这对我不起作用。

以下是发出连接请求的 initClient 调用。我添加了 if 检查 mqttManager 是否已经初始化,首先断开连接然后发出连接请求。尽管我已将 initRequest 放入应用程序屏幕的 onCreate() 回调中,但在应用程序打开时仅调用一次。我检查了它只被调用一次的日志。

AWSMobileClient.getInstance().initialize(this, object : Callback<UserStateDetails> {
            override fun onResult(result: UserStateDetails) {
                Log.i(TAG,"connect request called");
                if(mqttManager != null){
                    mqttManager?.disconnect()
                }
                initIoTClient()
            }

            override fun onError(e: Exception) {
                Log.e(TAG, "onError: ", e)
            }
        })

以下是我订阅唯一用户 ID 的订阅代码片段

fun subscribeClick(topic: String) {

    Log.d(TAG, "topic = $topic")

    try {
        mqttManager?.subscribeToTopic(topic, AWSIotMqttQos.QOS0,
            { topic, data ->
                runOnUiThread {
                    try {
                        val message = String(data, Charsets.UTF_8)
                        Log.d(TAG, "Message arrived:")
                        Log.d(TAG, "   Topic: $topic")
                        Log.d(TAG, " Message: $message")

                        val gson = Gson()
                        val notificationModel = gson.fromJson(message, NotificationModel::class.java)
                        var orderServiceMapperResponseModel = OrderServiceMapperResponseModel()
                        orderServiceMapperResponseModel.seatId = notificationModel.seatId
                        orderServiceMapperResponseModel.serviceName = notificationModel.service
                        orderServiceMapperResponseModel.id = notificationModel.id
                        orderServiceMapperResponseModel.createdDate = notificationModel.createdDate
                        serviceList.add(orderServiceMapperResponseModel)
                        if (isPictureInPictureMode) {
                            if (isShownNotification) {
                                updateNotificationCount()
                            } else {
                                updatePIPWindowContent()
                            }
                        } else {
                            updateAdapterDataSource()
                        }

                    } catch (e: UnsupportedEncodingException) {
                        Log.e(TAG, "Message encoding error.", e)
                    }
                }
            })
    } catch (e: Exception) {
        Log.e(TAG, "Subscription error.", e)
    }
}

我也总是在我的应用程序屏幕的 onDestroy() 中发出 disconnect() 请求

mqttManager?.disconnect()

但我仍然收到 3 条订阅消息,而不是 1 条。

您收到 3 条重复的消息不是因为您订阅了 3 次,而是因为您创建了 3 个单独的连接

MQTT specification 明确指出

If a Server receives a SUBSCRIBE Packet containing a Topic Filter that is identical to an existing Subscription’s Topic Filter then it MUST completely replace that existing Subscription with a new Subscription.

意味着永远不会发生每个连接的重复订阅,除非服务器有一个损坏的实现。

您的代码看起来从不发送断开连接请求,而每当调用代码块时都会创建一个新连接。

您应该保留一个 MQTT 会话,或者确保在应用程序关闭时关闭连接。