多个 MQTT 客户端订阅同一主题
Multi MQTT clients subscribed to the same topic
我实际上有一个 spring 启动应用程序,其中有一个订阅主题的 MQTT 客户端。
我在放置我的应用程序的 2 个实例 (2 containers/pods) 时遇到问题,因为它创建了 2 个到发布者的连接!
问题是我在数据库中记录了每条消息的内容,所以我收到了 2 次数据!一个来自 pod,一个来自第二个 .. 数据库中有 2 条记录...
这是我的实际代码:
.
..
...
....
@Bean
public MqttConnectOptions getReceiverMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setConnectionTimeout(30);
mqttConnectOptions.setKeepAliveInterval(60);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setUserName(bean.getProperty("username"));
String password = bean.getProperty("password");
String hostUrl = bean.getProperty("url");
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[] { hostUrl });
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getReceiverMqttConnectOptions());
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
String clientId = "client-id" + UUID.randomUUID().toString();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), "jenkins");
adapter.setCompletionTimeout(20000);
adapter.setConverter(new DefaultPahoMessageConverter());
...
..
.
如果你们有解决方案可以在不创建 2 个 MQTT 连接的情况下使用我的应用程序的 2 个 pod。谢谢
您需要使用支持共享订阅的代理(这是添加到 MQTTv5 标准的功能,但一些代理支持 none v3 标准版本)
共享订阅允许一组客户订阅一个主题(或通配符主题),并且发布到该主题的任何给定消息将只传递给一组客户中的一个。
您可以阅读有关共享订阅的更多信息here
我实际上有一个 spring 启动应用程序,其中有一个订阅主题的 MQTT 客户端。
我在放置我的应用程序的 2 个实例 (2 containers/pods) 时遇到问题,因为它创建了 2 个到发布者的连接! 问题是我在数据库中记录了每条消息的内容,所以我收到了 2 次数据!一个来自 pod,一个来自第二个 .. 数据库中有 2 条记录...
这是我的实际代码:
.
..
...
....
@Bean
public MqttConnectOptions getReceiverMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setConnectionTimeout(30);
mqttConnectOptions.setKeepAliveInterval(60);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setUserName(bean.getProperty("username"));
String password = bean.getProperty("password");
String hostUrl = bean.getProperty("url");
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[] { hostUrl });
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getReceiverMqttConnectOptions());
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
String clientId = "client-id" + UUID.randomUUID().toString();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), "jenkins");
adapter.setCompletionTimeout(20000);
adapter.setConverter(new DefaultPahoMessageConverter());
...
..
.
如果你们有解决方案可以在不创建 2 个 MQTT 连接的情况下使用我的应用程序的 2 个 pod。谢谢
您需要使用支持共享订阅的代理(这是添加到 MQTTv5 标准的功能,但一些代理支持 none v3 标准版本)
共享订阅允许一组客户订阅一个主题(或通配符主题),并且发布到该主题的任何给定消息将只传递给一组客户中的一个。
您可以阅读有关共享订阅的更多信息here