spring-integration-mqtt 订阅多个Mqtt Server
spring-integration-mqtt With multiple Mqtt Servers for subscription
我正在使用 Spring 的 spring-integration-mqtt,我可以连接到单个 Mqtt 服务器并可以接收有关订阅主题的消息,现在我想制作一个可以连接到多个 Mqtt 服务器并可以从每个连接接收数据,我想将其管理为动态的,我可以从数据库或文本文件添加更多 Mqtt 服务器。
用于订阅的单个 Mqtt 连接的简单 bean 如下
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel());
return adapter2;
}
上面的代码为 mqtt 服务器创建了一个连接,并且可以接收消息,如果我为具有不同 Mqtt ip 地址的第二个服务器复制粘贴相同的代码两次,我可以按如下方式连接到两个 Mqtt 服务器
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel());
return adapter2;
}
@Bean
public MessageProducer inbound2() {
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel());
return adapter2;
}
上面的代码也可以正常工作,我可以从两个 Mqtt 服务器接收消息,但是有什么方法可以动态管理它,如下所示,我将 bean 的 return 类型更改为列表,但是没有'成功了:
@Bean
public List<MqttPahoMessageDrivenChannelAdapter> getAdapter () {
List<MqttPahoMessageDrivenChannelAdapter > logConfList=new ArrayList<MqttPahoMessageDrivenChannelAdapter>();
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel() );
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter.setCompletionTimeout(0);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel() );
logConfList.add(adapter);
logConfList.add(adapter2);
return logConfList;
}
有什么方法可以动态管理这些 bean,我可以从文本文件和 for 循环中获取 mqtt 服务器详细信息,或者我可以管理多个连接的东西。
参见Dynamic and runtime Integration Flows。
@Autowired
private IntegrationFlowContext flowContext;
private IntegrationFlowRegistration addAnAdapter(String uri, String clientId, MessageChannel channel,
String... topics) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(uri, clientId, topics);
// more adapter configuration
IntegrationFlow flow = IntegrationFlows.from(adapter)
.channel(channel)
.get();
return this.flowContext.registration(flow).register();
}
private void removeAdapter(IntegrationFlowRegistration flowReg) {
this.flowContext.remove(flowReg.getId());
}
我正在使用 Spring 的 spring-integration-mqtt,我可以连接到单个 Mqtt 服务器并可以接收有关订阅主题的消息,现在我想制作一个可以连接到多个 Mqtt 服务器并可以从每个连接接收数据,我想将其管理为动态的,我可以从数据库或文本文件添加更多 Mqtt 服务器。
用于订阅的单个 Mqtt 连接的简单 bean 如下
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel());
return adapter2;
}
上面的代码为 mqtt 服务器创建了一个连接,并且可以接收消息,如果我为具有不同 Mqtt ip 地址的第二个服务器复制粘贴相同的代码两次,我可以按如下方式连接到两个 Mqtt 服务器
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel());
return adapter2;
}
@Bean
public MessageProducer inbound2() {
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel());
return adapter2;
}
上面的代码也可以正常工作,我可以从两个 Mqtt 服务器接收消息,但是有什么方法可以动态管理它,如下所示,我将 bean 的 return 类型更改为列表,但是没有'成功了:
@Bean
public List<MqttPahoMessageDrivenChannelAdapter> getAdapter () {
List<MqttPahoMessageDrivenChannelAdapter > logConfList=new ArrayList<MqttPahoMessageDrivenChannelAdapter>();
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel() );
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter.setCompletionTimeout(0);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel() );
logConfList.add(adapter);
logConfList.add(adapter2);
return logConfList;
}
有什么方法可以动态管理这些 bean,我可以从文本文件和 for 循环中获取 mqtt 服务器详细信息,或者我可以管理多个连接的东西。
参见Dynamic and runtime Integration Flows。
@Autowired
private IntegrationFlowContext flowContext;
private IntegrationFlowRegistration addAnAdapter(String uri, String clientId, MessageChannel channel,
String... topics) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(uri, clientId, topics);
// more adapter configuration
IntegrationFlow flow = IntegrationFlows.from(adapter)
.channel(channel)
.get();
return this.flowContext.registration(flow).register();
}
private void removeAdapter(IntegrationFlowRegistration flowReg) {
this.flowContext.remove(flowReg.getId());
}