如何避免订阅同一主题并在 HiveMQ Android 客户端中多次触发回调?
How to avoid subscribing to the same topic and fire the callback multiple times in HiveMQ Android Client?
预期行为
我想要一个回调来收听我订阅的每个主题,每条消息发送一次。我的意思是,我想订阅一个主题 1000 次,但是当收到一条消息时,我只想听一次。
IDK 如果我做错了什么(我猜)。
实际行为
- 我正在开发家庭安全摄像头应用程序。
- 我有一个我拥有的相机列表。
- 对于列表中的每个摄像头,我都订阅了一个主题。
- 每隔 30 秒,我更新一次屏幕,并再次为每个相机订阅一个主题。这意味着一个TOPIC可以被多次订阅。
- 每次我收到关于某个主题的消息时,回调都会触发有关同一主题被订阅了多少次的消息。
复制
步骤
- 有个话题camera/123
- 使用以下名为 subscribeWith
的方法订阅主题 N 次
- 通过 camera/123
发送消息
- 您将收到N次消息,因为您订阅了N次主题
复制代码
只是变量
private var mqtt: Mqtt5AsyncClient? = null
private var username: String? = null
private var password: String? = null
private val serverHost: String,
private val serverPort: Int = 1883
构建 MQTT
private fun build() {
if (mqtt != null) return
mqtt = Mqtt5Client.builder()
.identifier(identifier())
.serverHost(serverHost)
.serverPort(serverPort)
.automaticReconnect()
.applyAutomaticReconnect()
.addConnectedListener { Timber.d("On Connected") }
.addDisconnectedListener { onMQTTDisconnected(it) }
.buildAsync()
}
连接MQTT
fun connect(username: String, password: String) {
build()
this.username = username
this.password = password
mqtt?.connectWith()
?.keepAlive(30)
?.sessionExpiryInterval(7200)
?.cleanStart(false)
?.simpleAuth()
?.username("abc")
?.password("123".toByteArray())
?.applySimpleAuth()
?.send()
}
然后,订阅一个话题
每次我订阅一个主题时,我都会使用这些 fun
fun subscribeWith(topic: String) {
mqtt?.subscribeWith()
?.topicFilter(topic)
?.qos(MqttQos.AT_MOST_ONCE)
?.callback { t -> onConsumingTopic(t) } <- I THINK THIS IS THE IMPORTANT THING
?.send()
?.whenComplete { ack, error -> onTopicConnected(ack, error, topic) }
}
如评论中所述,目前唯一的解决方案是在 MQTT 客户端库之外保留一份已订阅主题的列表,并在订阅新主题之前检查它。
我找到了正确答案。
不需要为每个订阅调用注册回调,也不需要像这样使用全局数组来处理注册的主题:
mqtt?.subscribeWith()
?.callback { t -> onConsumingTopic(t) } <- This is not needed
相反,您可以为所有消息注册一个“全局”回调,例如:
client.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> { ... } );
然后您可以在不提供回调的情况下订阅。
client.subscribeWith().topicFilter("test").qos(MqttQos.AT_LEAST_ONCE).send();
完整示例:
构建 MQTT
mqtt = Mqtt5Client.builder()
.identifier(identifier())
.serverHost(serverHost)
.serverPort(serverPort)
.automaticReconnect()
.applyAutomaticReconnect()
.addConnectedListener { onMQTTConnected(it) }
.addDisconnectedListener { onMQTTDisconnected(it) }
.buildAsync()
mqtt?.publishes(MqttGlobalPublishFilter.SUBSCRIBED) { onConsumingTopic(it) }
连接MQTT
mqtt?.connectWith()
?.keepAlive(30)
?.sessionExpiryInterval(7200)
?.cleanStart(false)
?.simpleAuth()
?.username(context.getString(R.string.mqtt_user))
?.password(context.getString(R.string.mqtt_pw).toByteArray())
?.applySimpleAuth()
?.send()
订阅主题
mqtt?.subscribeWith()
?.topicFilter(topic)
?.qos(MqttQos.AT_LEAST_ONCE)
?.send()
?.whenComplete { ack, error -> onTopicSubscribed(ack, error, topic) }
预期行为
我想要一个回调来收听我订阅的每个主题,每条消息发送一次。我的意思是,我想订阅一个主题 1000 次,但是当收到一条消息时,我只想听一次。
IDK 如果我做错了什么(我猜)。
实际行为
- 我正在开发家庭安全摄像头应用程序。
- 我有一个我拥有的相机列表。
- 对于列表中的每个摄像头,我都订阅了一个主题。
- 每隔 30 秒,我更新一次屏幕,并再次为每个相机订阅一个主题。这意味着一个TOPIC可以被多次订阅。
- 每次我收到关于某个主题的消息时,回调都会触发有关同一主题被订阅了多少次的消息。
复制
步骤
- 有个话题camera/123
- 使用以下名为 subscribeWith 的方法订阅主题 N 次
- 通过 camera/123 发送消息
- 您将收到N次消息,因为您订阅了N次主题
复制代码
只是变量
private var mqtt: Mqtt5AsyncClient? = null
private var username: String? = null
private var password: String? = null
private val serverHost: String,
private val serverPort: Int = 1883
构建 MQTT
private fun build() {
if (mqtt != null) return
mqtt = Mqtt5Client.builder()
.identifier(identifier())
.serverHost(serverHost)
.serverPort(serverPort)
.automaticReconnect()
.applyAutomaticReconnect()
.addConnectedListener { Timber.d("On Connected") }
.addDisconnectedListener { onMQTTDisconnected(it) }
.buildAsync()
}
连接MQTT
fun connect(username: String, password: String) {
build()
this.username = username
this.password = password
mqtt?.connectWith()
?.keepAlive(30)
?.sessionExpiryInterval(7200)
?.cleanStart(false)
?.simpleAuth()
?.username("abc")
?.password("123".toByteArray())
?.applySimpleAuth()
?.send()
}
然后,订阅一个话题 每次我订阅一个主题时,我都会使用这些 fun
fun subscribeWith(topic: String) {
mqtt?.subscribeWith()
?.topicFilter(topic)
?.qos(MqttQos.AT_MOST_ONCE)
?.callback { t -> onConsumingTopic(t) } <- I THINK THIS IS THE IMPORTANT THING
?.send()
?.whenComplete { ack, error -> onTopicConnected(ack, error, topic) }
}
如评论中所述,目前唯一的解决方案是在 MQTT 客户端库之外保留一份已订阅主题的列表,并在订阅新主题之前检查它。
我找到了正确答案。
不需要为每个订阅调用注册回调,也不需要像这样使用全局数组来处理注册的主题:
mqtt?.subscribeWith()
?.callback { t -> onConsumingTopic(t) } <- This is not needed
相反,您可以为所有消息注册一个“全局”回调,例如:
client.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> { ... } );
然后您可以在不提供回调的情况下订阅。
client.subscribeWith().topicFilter("test").qos(MqttQos.AT_LEAST_ONCE).send();
完整示例:
构建 MQTT
mqtt = Mqtt5Client.builder()
.identifier(identifier())
.serverHost(serverHost)
.serverPort(serverPort)
.automaticReconnect()
.applyAutomaticReconnect()
.addConnectedListener { onMQTTConnected(it) }
.addDisconnectedListener { onMQTTDisconnected(it) }
.buildAsync()
mqtt?.publishes(MqttGlobalPublishFilter.SUBSCRIBED) { onConsumingTopic(it) }
连接MQTT
mqtt?.connectWith()
?.keepAlive(30)
?.sessionExpiryInterval(7200)
?.cleanStart(false)
?.simpleAuth()
?.username(context.getString(R.string.mqtt_user))
?.password(context.getString(R.string.mqtt_pw).toByteArray())
?.applySimpleAuth()
?.send()
订阅主题
mqtt?.subscribeWith()
?.topicFilter(topic)
?.qos(MqttQos.AT_LEAST_ONCE)
?.send()
?.whenComplete { ack, error -> onTopicSubscribed(ack, error, topic) }