SI 订阅多个 mqtt 主题

SI subscription to multiple mqtt topics

我正在尝试学习如何在 Spring-Integration 中处理 MQTT 消息。 已经创建了一个转换器,它为每个 MQTT 主题订阅一个 MqttPahoMessageDrivenChannelAdapter 以消费和转换消息。

问题是我们的数据提供者正计划 "speed-up" 在他这边发布消息。因此,与其拥有几个(<=10)个主题,每个主题都有大约 150 个字段的消息,而是计划将这些字段中的每一个发布到单独的 MQTT 主题。

这意味着我的转换器必须消耗 ca。 1000个mqtt主题,不知是否:

  1. spring-集成对它来说还是一个不错的选择吗。因为 afaik。提到的适配器使用 PAHO MqttClient,它将在一个线程中使用来自它订阅的所有主题的消息,并且创建这些适配器的 1000 个实例是一种矫枉过正。
  2. 如果我们进一步坚持 spring-集成并使用提供的组件,为所有字段创建一个单一的入站适配器是否是个好主意,以前在一个主题的消息中,但将转换从适配器 bean 移到一个单独的 bean(执行转换),该 bean 通过执行器通道连接到适配器,从而在某些线程池上并行执行这些字段的转换。

提前感谢您的回答!

我觉得你的想法很有道理。

为此,您需要实施 直通 MqttMessageConverter 并提供 MqttMessage 作为 payloadtopic作为 header:

public class PassThroughMqttMessageConverter implements MqttMessageConverter {

    @Override
    public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
        return MessageBuilder.withPayload(mqttMessage)
                .setHeader(MqttHeaders.RECEIVED_TOPIC, topic)
                .build();
    }

    @Override
    public Object fromMessage(Message<?> message, Class<?> targetClass) {
        return null;
    }

    @Override
    public Message<?> toMessage(Object payload, MessageHeaders headers) {
        return null;
    }

}

因此,在自定义 transformer.

中提到 ExecutorChannel 之后,您真的可以在下游执行目标转换

您也可以考虑实现自定义 MqttPahoClientFactoryDefaultMqttPahoClientFactory 的扩展也可以)并提供自定义 ScheduledExecutorService 以注入到 MqttClient 您将在 getClientInstance().

中创建