Spring 集成和 IBM MQ

Spring Integration and IBM MQ

帮助我尝试连接 Spring Integration Ibm mq,我在做什么:

public Publisher<Message<String>> jmsReactiveSource() {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(ibmConnectionFactory)
                        .destination("DEV.QUEUE.1"))
                .channel(MessageChannels.queue())
                .log(org.springframework.integration.handler.LoggingHandler.Level.DEBUG)
                .log()
                .toReactivePublisher();
    }

    @GetMapping("/event")
    public Mono<String> getEvent() {
        return Mono.from(jmsReactiveSource())
                .log()
                .map (Message::getPayload);
    }
    

    @GetMapping("/pub")
    public void produce() {
        jmsTemplateIbm.convertAndSend("DEV.QUEUE.1", "MESSAGE");
    }

当我在调用 /pub 之后调用 /event 时没有任何反应,消息没有被扣除,我做错了什么,我需要让 ibm mq 成为非阻塞的,因为大量的资源都花在了等待响应上来自 IBM MQ。

请参阅 Spring 关于 Java DSL 的集成文档:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl。我们在示例中无处不在谈论将 IntegrationFlow 作为 @Bean。只有这样 IntegrationFlowBeanPostProcessor 才能理解 DSL 定义并在应用程序上下文中注册相应的 bean。带有那些 @GetMapping 方法的整个 class 和 jmsReactiveSource() 的这个 @Bean 必须作为一个 @Configuration 来调用 jmsReactiveSource() 方法作为来自应用程序上下文的 bean 请求。否则,如果您不想让这个 @RestController 成为这样的一个,您应该考虑将这个流 bean 移到单独的 @Configuration class 中。然后你需要将那个 Publisher<Message<String>> bean 自动装配到这个控制器 class.

更新

关于提早消费的第二个问题,您必须将 Jms.messageDrivenChannelAdapter() 标记为 autoStartup(false) 并从 getEvent() 中的 Mono 使用它的 doOnRequest()。适配器上有一个 id() 选项供您考虑正确自动连接该端点。你是对的:即使它最终是反应性的,但开始时不是并且必须分别处理以在请求之前不消耗。

您也可以考虑 return Flux 而不是 Mono:整个过程被认为是无界的。这样的REST请求也应该改为SSE:https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-codecs-streaming