如何避免订阅同一主题并在 HiveMQ Android 客户端中多次触发回调?

How to avoid subscribing to the same topic and fire the callback multiple times in HiveMQ Android Client?

预期行为

我想要一个回调来收听我订阅的每个主题,每条消息发送一次。我的意思是,我想订阅一个主题 1000 次,但是当收到一条消息时,我只想听一次。

IDK 如果我做错了什么(我猜)。

实际行为

复制

步骤

  1. 有个话题camera/123
  2. 使用以下名为 subscribeWith
  3. 的方法订阅主题 N 次
  4. 通过 camera/123
  5. 发送消息
  6. 您将收到N次消息,因为您订阅了N次主题

复制代码

只是变量

private var mqtt: Mqtt5AsyncClient? = null
private var username: String? = null
private var password: String? = null
private val serverHost: String,
private val serverPort: Int = 1883

构建 MQTT

private fun build() {
        if (mqtt != null) return

        mqtt = Mqtt5Client.builder()
                .identifier(identifier())
                .serverHost(serverHost)
                .serverPort(serverPort)
                .automaticReconnect()
                .applyAutomaticReconnect()
                .addConnectedListener { Timber.d("On Connected") }
                .addDisconnectedListener { onMQTTDisconnected(it) }
                .buildAsync()
    }

连接MQTT

fun connect(username: String, password: String) {
        build()

        this.username = username
        this.password = password

        mqtt?.connectWith()
                ?.keepAlive(30)
                ?.sessionExpiryInterval(7200)
                ?.cleanStart(false)
                ?.simpleAuth()
                ?.username("abc")
                ?.password("123".toByteArray())
                ?.applySimpleAuth()
                ?.send()
    }

然后,订阅一个话题 每次我订阅一个主题时,我都会使用这些 fun

fun subscribeWith(topic: String) {
        mqtt?.subscribeWith()
                ?.topicFilter(topic)
                ?.qos(MqttQos.AT_MOST_ONCE)
               ?.callback { t -> onConsumingTopic(t) }  <- I THINK THIS IS THE IMPORTANT THING
                ?.send()
                ?.whenComplete { ack, error -> onTopicConnected(ack, error, topic) }

    }

如评论中所述,目前唯一的解决方案是在 MQTT 客户端库之外保留一份已订阅主题的列表,并在订阅新主题之前检查它。

我找到了正确答案。

不需要为每个订阅调用注册回调,也不需要像这样使用全局数组来处理注册的主题:

mqtt?.subscribeWith()
    ?.callback { t -> onConsumingTopic(t) }  <- This is not needed

相反,您可以为所有消息注册一个“全局”回调,例如:

client.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> { ... } );

然后您可以在不提供回调的情况下订阅。

client.subscribeWith().topicFilter("test").qos(MqttQos.AT_LEAST_ONCE).send();

完整示例:

构建 MQTT

 mqtt = Mqtt5Client.builder()
                .identifier(identifier())
                .serverHost(serverHost)
                .serverPort(serverPort)
                .automaticReconnect()
                .applyAutomaticReconnect()
                .addConnectedListener { onMQTTConnected(it) }
                .addDisconnectedListener { onMQTTDisconnected(it) }
                .buildAsync()

        mqtt?.publishes(MqttGlobalPublishFilter.SUBSCRIBED) { onConsumingTopic(it) }

连接MQTT

mqtt?.connectWith()
                ?.keepAlive(30)
                ?.sessionExpiryInterval(7200)
                ?.cleanStart(false)
                ?.simpleAuth()
                ?.username(context.getString(R.string.mqtt_user))
                ?.password(context.getString(R.string.mqtt_pw).toByteArray())
                ?.applySimpleAuth()
                ?.send()

订阅主题

mqtt?.subscribeWith()
                ?.topicFilter(topic)
                ?.qos(MqttQos.AT_LEAST_ONCE)
                ?.send()
                ?.whenComplete { ack, error -> onTopicSubscribed(ack, error, topic) }