SI 订阅多个 mqtt 主题
SI subscription to multiple mqtt topics
我正在尝试学习如何在 Spring-Integration 中处理 MQTT 消息。
已经创建了一个转换器,它为每个 MQTT 主题订阅一个 MqttPahoMessageDrivenChannelAdapter 以消费和转换消息。
问题是我们的数据提供者正计划 "speed-up" 在他这边发布消息。因此,与其拥有几个(<=10)个主题,每个主题都有大约 150 个字段的消息,而是计划将这些字段中的每一个发布到单独的 MQTT 主题。
这意味着我的转换器必须消耗 ca。 1000个mqtt主题,不知是否:
- spring-集成对它来说还是一个不错的选择吗。因为 afaik。提到的适配器使用 PAHO MqttClient,它将在一个线程中使用来自它订阅的所有主题的消息,并且创建这些适配器的 1000 个实例是一种矫枉过正。
- 如果我们进一步坚持 spring-集成并使用提供的组件,为所有字段创建一个单一的入站适配器是否是个好主意,以前在一个主题的消息中,但将转换从适配器 bean 移到一个单独的 bean(执行转换),该 bean 通过执行器通道连接到适配器,从而在某些线程池上并行执行这些字段的转换。
提前感谢您的回答!
我觉得你的想法很有道理。
为此,您需要实施 直通 MqttMessageConverter
并提供 MqttMessage
作为 payload
和 topic
作为 header:
public class PassThroughMqttMessageConverter implements MqttMessageConverter {
@Override
public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
return MessageBuilder.withPayload(mqttMessage)
.setHeader(MqttHeaders.RECEIVED_TOPIC, topic)
.build();
}
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
return null;
}
@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
return null;
}
}
因此,在自定义 transformer
.
中提到 ExecutorChannel
之后,您真的可以在下游执行目标转换
您也可以考虑实现自定义 MqttPahoClientFactory
(DefaultMqttPahoClientFactory
的扩展也可以)并提供自定义 ScheduledExecutorService
以注入到 MqttClient
您将在 getClientInstance()
.
中创建
我正在尝试学习如何在 Spring-Integration 中处理 MQTT 消息。 已经创建了一个转换器,它为每个 MQTT 主题订阅一个 MqttPahoMessageDrivenChannelAdapter 以消费和转换消息。
问题是我们的数据提供者正计划 "speed-up" 在他这边发布消息。因此,与其拥有几个(<=10)个主题,每个主题都有大约 150 个字段的消息,而是计划将这些字段中的每一个发布到单独的 MQTT 主题。
这意味着我的转换器必须消耗 ca。 1000个mqtt主题,不知是否:
- spring-集成对它来说还是一个不错的选择吗。因为 afaik。提到的适配器使用 PAHO MqttClient,它将在一个线程中使用来自它订阅的所有主题的消息,并且创建这些适配器的 1000 个实例是一种矫枉过正。
- 如果我们进一步坚持 spring-集成并使用提供的组件,为所有字段创建一个单一的入站适配器是否是个好主意,以前在一个主题的消息中,但将转换从适配器 bean 移到一个单独的 bean(执行转换),该 bean 通过执行器通道连接到适配器,从而在某些线程池上并行执行这些字段的转换。
提前感谢您的回答!
我觉得你的想法很有道理。
为此,您需要实施 直通 MqttMessageConverter
并提供 MqttMessage
作为 payload
和 topic
作为 header:
public class PassThroughMqttMessageConverter implements MqttMessageConverter {
@Override
public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
return MessageBuilder.withPayload(mqttMessage)
.setHeader(MqttHeaders.RECEIVED_TOPIC, topic)
.build();
}
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
return null;
}
@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
return null;
}
}
因此,在自定义 transformer
.
ExecutorChannel
之后,您真的可以在下游执行目标转换
您也可以考虑实现自定义 MqttPahoClientFactory
(DefaultMqttPahoClientFactory
的扩展也可以)并提供自定义 ScheduledExecutorService
以注入到 MqttClient
您将在 getClientInstance()
.