HiveMQ MQTT 客户端 - 重新订阅自动重新连接的主题
HiveMQ MQTT Client - Resubscribe topics on automatic reconnect
我正在 Spring 中使用 HiveMQ MQTT 客户端接收 MQTT 消息。
我的客户端配置是这样的
public Mqtt3AsyncClient mqtt3Client() {
var mqtt3Client = Mqtt3Client.builder()
.serverHost("my.host")
.sslWithDefaultConfig()
.serverPort(0000)
.automaticReconnectWithDefaultConfig()
.buildBlocking();
mqtt3Client.connect();
return mqtt3Client.toAsync();
}
客户端可用后,使用客户端初始化另一个Spring Bean。它订阅了一个主题:
@PostConstruct
public void subscribeTopic() {
mqtt3AsyncClient.subscribeWith()
.topicFilter("topicfilter")
.qos(MqttQos.AT_LEAST_ONCE)
.callback(message -> {
/*Handle message*/
})
.send()
.whenComplete((mqtt3SubAck, throwable) -> {
if (throwable != null) {
/*Logging*/
} else {
/*Logging*/
}
});
}
我多次看到没有更多的消息发送到我的应用程序,而我仍然能够使用客户端连接发送消息(因此当时已连接)。
我找不到任何关于 HiveMQ MQTT 客户端如何处理已配置 automaticReconnectWithDefaultConfig()
的文档。谁能指出,我在 subscribeTopic()
中创建的订阅是否被重新订阅?
我还找到了可以替代 .topicFilter(..).qos(...)
部分的方法 addSubscription()
。我也找不到任何信息这是否使订阅对连接丢失更有弹性。
如果能提供有关该主题的任何信息,我将不胜感激。
谢谢。
目前,如果代理在重新连接的 ConAck 中报告现有会话,HiveMQ MQTT 客户端将仅继续接收订阅消息。这需要两件事 - 1) 您需要在初始连接时设置 cleanSession = false,以及 2) 代理需要在连接之间没有丢失会话。
对于 1),您可以尝试将其添加到您的连接中:
client.connectWith().cleanSession(false).send();
对于 2),这将取决于代理以及连接丢失的原因。如果是 'just' 网络中断并且代理在后台 运行 正常,那么它应该可以正常工作。如果代理崩溃并重新启动,则需要代理配置持久性并且能够在重新启动后重新建立会话。
实际上在 HiveMQ MQTT 客户端的 github 项目页面上有一些关于这个问题的讨论,以及是否应该添加功能以自动重新订阅,即使在没有预先存在的情况下会话被发现。并且还有一个相关的说明,即使在重新连接后没有找到会话,连接丢失时完成的任何发布是否应该自动发布。如果这些是您需要的功能,也许可以跳到那里参与讨论 :)
最后,您还可以通过在构建客户端时添加 MqttClientConnectedListener 来手动执行重新订阅,客户端可以在每次自动重新连接时重新创建订阅。
HTH
干杯,
C
我正在 Spring 中使用 HiveMQ MQTT 客户端接收 MQTT 消息。
我的客户端配置是这样的
public Mqtt3AsyncClient mqtt3Client() {
var mqtt3Client = Mqtt3Client.builder()
.serverHost("my.host")
.sslWithDefaultConfig()
.serverPort(0000)
.automaticReconnectWithDefaultConfig()
.buildBlocking();
mqtt3Client.connect();
return mqtt3Client.toAsync();
}
客户端可用后,使用客户端初始化另一个Spring Bean。它订阅了一个主题:
@PostConstruct
public void subscribeTopic() {
mqtt3AsyncClient.subscribeWith()
.topicFilter("topicfilter")
.qos(MqttQos.AT_LEAST_ONCE)
.callback(message -> {
/*Handle message*/
})
.send()
.whenComplete((mqtt3SubAck, throwable) -> {
if (throwable != null) {
/*Logging*/
} else {
/*Logging*/
}
});
}
我多次看到没有更多的消息发送到我的应用程序,而我仍然能够使用客户端连接发送消息(因此当时已连接)。
我找不到任何关于 HiveMQ MQTT 客户端如何处理已配置 automaticReconnectWithDefaultConfig()
的文档。谁能指出,我在 subscribeTopic()
中创建的订阅是否被重新订阅?
我还找到了可以替代 .topicFilter(..).qos(...)
部分的方法 addSubscription()
。我也找不到任何信息这是否使订阅对连接丢失更有弹性。
如果能提供有关该主题的任何信息,我将不胜感激。
谢谢。
目前,如果代理在重新连接的 ConAck 中报告现有会话,HiveMQ MQTT 客户端将仅继续接收订阅消息。这需要两件事 - 1) 您需要在初始连接时设置 cleanSession = false,以及 2) 代理需要在连接之间没有丢失会话。
对于 1),您可以尝试将其添加到您的连接中:
client.connectWith().cleanSession(false).send();
对于 2),这将取决于代理以及连接丢失的原因。如果是 'just' 网络中断并且代理在后台 运行 正常,那么它应该可以正常工作。如果代理崩溃并重新启动,则需要代理配置持久性并且能够在重新启动后重新建立会话。
实际上在 HiveMQ MQTT 客户端的 github 项目页面上有一些关于这个问题的讨论,以及是否应该添加功能以自动重新订阅,即使在没有预先存在的情况下会话被发现。并且还有一个相关的说明,即使在重新连接后没有找到会话,连接丢失时完成的任何发布是否应该自动发布。如果这些是您需要的功能,也许可以跳到那里参与讨论 :)
最后,您还可以通过在构建客户端时添加 MqttClientConnectedListener 来手动执行重新订阅,客户端可以在每次自动重新连接时重新创建订阅。
HTH
干杯,
C