我们如何使用 spring 集成让不同的处理程序订阅来自不同主题的消息?

How can we have different handlers to subscribe messages from different topics using spring Integration?

我编写了我的第一个 spring 集成应用程序,它使用 mqtt 代理订阅来自设备的不同主题的消息。设备正在发布消息,客户端(代码)正在使用相同的主题访问这些消息。

我添加了一个处理程序来访问来自代理的消息,并在 类 中进一步使用它。现在,就我而言,我想为不同的主题设置不同的处理程序,以便它们都可以映射到不同的 VO 类 并在业务逻辑中进一步使用它。

据我所知,我只想创建一个到代理的连接,一个通道,但可能会出现不同的主题,它们应该在同一连接的不同处理程序中处理。我怎样才能做到这一点?

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
//        SpringApplicationBuilder springApplicationBuilder = new SpringApplicationBuilder(MqttJavaApplication.class);
        SpringApplication.run(MqttJavaApplication.class,args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter inbound() {
        String clientId = "uuid-" + UUID.randomUUID().toString();
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", clientId,"camera/status");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
//        adapter.setOutputChannelName("mqttInputChannel");
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    public IntegrationFlow mqttInFlow() {
        System.out.println(Arrays.stream(SubScribeMessages.class.getMethods()).findFirst());
        return IntegrationFlows.from(inbound())
                .transform(p -> p)
                .handle("addTopics","handlHere")
                .get();
    }

    @Component
    public class MyService{

        @Autowired
        MqttPahoMessageDrivenChannelAdapter adapter;

        @Bean
        public String addTopics()
        {
            if(adapter.getTopic().length>0)
            {
                adapter.addTopic("camera/+/counts"); //new topic 
                adapter.addTopic("camera/+/live_counts"); //new topic
            }
            return "";
        }

        // topic "camera/+/counts" is handled here but other messages also come here, how do we handle other topics in separate handlers?
        @ServiceActivator(inputChannel = "mqttInputChannel")
        public void handleHere(@Payload Object mess) throws JsonProcessingException {
            String[]  topics = adapter.getTopic();
            for(String topic:topics)
                System.out.println(topic); // How can I get topic name which is using a wildcard?
            ObjectMapper objectMapper = new ObjectMapper();
            String json=mess.toString();
            System.out.println(json);
            CountVo countVo = objectMapper.readValue(json, CountVo.class);
            if (!countVo.equals(null))
                System.out.println(countVo.getIrisysCounts().get(0).getName());

        }
    }

}

补充问题

使用通配符时如何获取完整的主题名称?已发布但被通配符捕获的实际主题。

请帮忙。

添加路由器(.route(...));您可以在 MqttHeaders.RECEIVED_TOPIC header(包含主题名称)上路由到每个主题的不同流程。

https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#messaging-routing-chapter

编辑

最简单的路由器是简单地将主题名称映射到频道名称。这是一个例子:

@SpringBootApplication
public class So67391175Application {

    public static void main(String[] args) {
        SpringApplication.run(So67391175Application.class, args);
    }

    @Bean
    public DefaultMqttPahoClientFactory pahoClientFactory() {
        DefaultMqttPahoClientFactory pahoClientFactory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions connectionOptions = new MqttConnectOptions();
        connectionOptions.setServerURIs(new String[] { "tcp://localhost:1883" });
        pahoClientFactory.setConnectionOptions(connectionOptions);
        return pahoClientFactory;
    }

    @Bean
    public IntegrationFlow mqttInFlow(DefaultMqttPahoClientFactory pahoClientFactory) {
        return IntegrationFlows.from(
                new MqttPahoMessageDrivenChannelAdapter("testClient",
                        pahoClientFactory, "topic1", "topic2"))
                .route("headers['" + MqttHeaders.RECEIVED_TOPIC + "']")
                .get();
    }

    @Bean
    public IntegrationFlow flow1() {
        return IntegrationFlows.from("topic1")
                .handle((payload, headers) -> {
                    System.out.println("message from topic1 " + payload + ": " + headers);
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow flow2() {
        return IntegrationFlows.from("topic2")
                .handle((payload, headers) -> {
                    System.out.println("message from topic2 " + payload + ": " + headers);
                    return null;
                })
                .get();
    }

}

message from topic1 test: {mqtt_receivedRetained=false, mqtt_id=1, mqtt_duplicate=false, id=1d950bce-aa47-7e3b-1a0d-e4d01ed707de, mqtt_receivedTopic=topic1, mqtt_receivedQos=1, timestamp=1620250633090}

message from topic2 test: {mqtt_receivedRetained=false, mqtt_id=2, mqtt_duplicate=false, id=7e9c3f51-c148-2b18-3588-ed27e93dae19, mqtt_receivedTopic=topic2, mqtt_receivedQos=1, timestamp=1620250644602}

谢谢加里!我认为你在路由上给出的答案只能采用定义的主题,而不是通配符或任何其他正则表达式。我无法理解动态路由对我有何帮助。

事实证明,我可以在初始化 bean 时添加通配符,并且可以使用适配器处理在输入通道上使用服务激活器。

像这样:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        SpringApplication.run(MqttJavaApplication.class,args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter inbound() {
        String clientId = "uuid-" + UUID.randomUUID().toString();
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", clientId, "irisys/V4D-20230143/status"   );
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Component
    public class MyService{

        @Autowired
        MqttPahoMessageDrivenChannelAdapter adapter;

        @Bean
        public String addTopics()
        {
            if(adapter.getTopic().length>0)
            {
                adapter.addTopic("camera/+/counts");
            }
            return "";
        }

        @Bean
        @ServiceActivator(inputChannel = "mqttInputChannel")
        public MessageHandler handler() {
            return new MessageHandler() {

                @SneakyThrows
                @Override
                public void handleMessage(Message<?> message) throws MessagingException {
                    System.out.println(message.getPayload());
                    System.out.println(message.getHeaders());
                    if(message.getHeaders().get("mqtt_receivedTopic").toString().contains("counts"))
                    {
                        ObjectMapper objectMapper = new ObjectMapper();
                        String json=message.getPayload().toString();
                        System.out.println(json);
                        CountVo countVo = objectMapper.readValue(json, CountVo.class);
                        if (!countVo.equals(null))
                            System.out.println(countVo.getIrisysCounts().get(0).getName());
                    }

                }

            };
        }
    }
}

您认为还有比这更好的方法吗?除了这个我想不出别的了。