是否可以使用 web flux spring 集成创建队列侦听器?

is it possible to create a queue listener using web flux spring integration?

    @Component
    @RequiredArgsConstructor
    public class EventListener {
    
        private final EventProcessingService eventProcessingService;

        @JmsListener(destination = "inputQueue", constainerFactory = "myContainerFactory)
        public void receiveMessage(Message message) {
             eventProcessingService.doSome(message).subscribe(); // return Mono<Void>
        }
    }
    


     @Service
     public class EventProcessingService {

         public Mono<Void> doSome(Message message) {
            //...
         }
     }
  
    @Configuration
    @RequiredArgsConstructor
    public class MqIntegration {
        
        private final ConnectionFactory connectionFactory;

        @Bean
        public Publisher<Message<String>> mqReactiveFlow() {
            return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
                        .destination("testQueue"))
                .channel(MessageChannels.queue())
                .toReactivePublisher();
        }
    }

我有一些与 ibm mq 交互的 webflux 应用程序和一个 JmsListener,它在收到消息时侦听队列中的消息 EventProcessingService 根据消息向其他服务发出请求。 我想知道如何使用 Spring 集成创建一个与反应线程一起工作的 JmsListener。换句话说,我想知道是否可以创建一个集成流,该流将从队列中接收消息并在接收到消息时调用 EvenProcessingService,这样它就不会对 webflux application[=12 中的线程产生负面影响=]

我认为您将不得不绕过 @JmsListener,因为它正在注册一个 on 消息,尽管异步不会是反应性的。 JMS 本质上是阻塞的,因此在顶部修补一个反应层将只是一个补丁。

您将需要使用您创建的 Publisher 来生成背压。我认为您将不得不定义和实例化您自己的侦听器 bean,它执行以下操作:

    public Flux<String> mqReactiveListener() {
        return Flux.from(mqReactiveFlow())
                .map(Message::getPayload);
    }

我认为我们需要澄清您问题中的一些要点。

  1. WebFlux 本身并不是一个项目。它是 Spring 反应式服务器之上关于 Web 的框架模块:https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#spring-webflux
  2. @JmsListener 是另一个 Spring 框架模块的一部分 - spring-jms。并且与反应式服务器用于 WebFlux 层的线程无关。 https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms
  3. Spring 集成是一个单独的项目,它在 Spring 框架依赖注入容器之上实现 EIP。它确实有自己的 WebFlux 模块,用于在 Spring 框架中的 WebFlux API 之上的通道适配器:https://docs.spring.io/spring-integration/docs/current/reference/html/webflux.html#webflux. And it also has a JMS module on top of JMS module from Spring Framework: https://docs.spring.io/spring-integration/docs/current/reference/html/jms.html#jms。然而,与 @JmsLisntener 没有任何关系,因为它的 Jms.messageDrivenChannelAdapter() 完全涵盖了该功能,并且从一个很大的高度来看它以相同的方式实现它 - 通过 MessageListenerContainer.

所有这些可能与问题无关,但最好清楚地了解您所问的内容,这样我们会觉得我们与您在同一页面上。

现在正在尝试回答您的问题。

只要您不从 WebFlux 层(@RequestMappingWebFlux.inboundGateway())处理 JMS,就不会影响那些非阻塞线程。 JMS MessageListenerContainer 生成自己的线程并执行从队列中拉取和消息处理。

您对 JMS 配置和服务的解释看起来更像这样:

 @Bean
 public IntegrationFlow mqReactiveFlow() {
       return IntegrationFlows
             .from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
                      .destination("testQueue"))
             .handle(this.eventProcessingService)
             .nullChannel();
 }

确实没有理由将 JMS 之后的消息转移到 QueueChannel,因为 JMS 监听已经是一个异步操作。

我们需要在您的流程结束时 nullChannel 只是因为您的服务方法 returns Mono 和框架不知道如何处理它。从版本 5.4.3 开始, NullChannel 能够订阅生成给它的消息的 Publisher 负载。

您可以在两者之间设置一个 FluxMessageChannel 来真正模拟 JMS 侦听器的背压,但这不会对您的下一个服务产生太大影响。