是否可以使用 web flux spring 集成创建队列侦听器?
is it possible to create a queue listener using web flux spring integration?
@Component
@RequiredArgsConstructor
public class EventListener {
private final EventProcessingService eventProcessingService;
@JmsListener(destination = "inputQueue", constainerFactory = "myContainerFactory)
public void receiveMessage(Message message) {
eventProcessingService.doSome(message).subscribe(); // return Mono<Void>
}
}
@Service
public class EventProcessingService {
public Mono<Void> doSome(Message message) {
//...
}
}
@Configuration
@RequiredArgsConstructor
public class MqIntegration {
private final ConnectionFactory connectionFactory;
@Bean
public Publisher<Message<String>> mqReactiveFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
.destination("testQueue"))
.channel(MessageChannels.queue())
.toReactivePublisher();
}
}
我有一些与 ibm mq 交互的 webflux 应用程序和一个 JmsListener,它在收到消息时侦听队列中的消息 EventProcessingService 根据消息向其他服务发出请求。
我想知道如何使用 Spring 集成创建一个与反应线程一起工作的 JmsListener。换句话说,我想知道是否可以创建一个集成流,该流将从队列中接收消息并在接收到消息时调用 EvenProcessingService,这样它就不会对 webflux application[=12 中的线程产生负面影响=]
我认为您将不得不绕过 @JmsListener
,因为它正在注册一个 on 消息,尽管异步不会是反应性的。 JMS 本质上是阻塞的,因此在顶部修补一个反应层将只是一个补丁。
您将需要使用您创建的 Publisher
来生成背压。我认为您将不得不定义和实例化您自己的侦听器 bean,它执行以下操作:
public Flux<String> mqReactiveListener() {
return Flux.from(mqReactiveFlow())
.map(Message::getPayload);
}
我认为我们需要澄清您问题中的一些要点。
- WebFlux 本身并不是一个项目。它是 Spring 反应式服务器之上关于 Web 的框架模块:https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#spring-webflux
@JmsListener
是另一个 Spring 框架模块的一部分 - spring-jms
。并且与反应式服务器用于 WebFlux 层的线程无关。 https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms
- Spring 集成是一个单独的项目,它在 Spring 框架依赖注入容器之上实现 EIP。它确实有自己的 WebFlux 模块,用于在 Spring 框架中的 WebFlux API 之上的通道适配器:https://docs.spring.io/spring-integration/docs/current/reference/html/webflux.html#webflux. And it also has a JMS module on top of JMS module from Spring Framework: https://docs.spring.io/spring-integration/docs/current/reference/html/jms.html#jms。然而,与
@JmsLisntener
没有任何关系,因为它的 Jms.messageDrivenChannelAdapter()
完全涵盖了该功能,并且从一个很大的高度来看它以相同的方式实现它 - 通过 MessageListenerContainer
.
所有这些可能与问题无关,但最好清楚地了解您所问的内容,这样我们会觉得我们与您在同一页面上。
现在正在尝试回答您的问题。
只要您不从 WebFlux 层(@RequestMapping
或 WebFlux.inboundGateway()
)处理 JMS,就不会影响那些非阻塞线程。 JMS MessageListenerContainer
生成自己的线程并执行从队列中拉取和消息处理。
您对 JMS 配置和服务的解释看起来更像这样:
@Bean
public IntegrationFlow mqReactiveFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
.destination("testQueue"))
.handle(this.eventProcessingService)
.nullChannel();
}
确实没有理由将 JMS 之后的消息转移到 QueueChannel
,因为 JMS 监听已经是一个异步操作。
我们需要在您的流程结束时 nullChannel
只是因为您的服务方法 returns Mono
和框架不知道如何处理它。从版本 5.4.3
开始, NullChannel
能够订阅生成给它的消息的 Publisher
负载。
您可以在两者之间设置一个 FluxMessageChannel
来真正模拟 JMS 侦听器的背压,但这不会对您的下一个服务产生太大影响。
@Component
@RequiredArgsConstructor
public class EventListener {
private final EventProcessingService eventProcessingService;
@JmsListener(destination = "inputQueue", constainerFactory = "myContainerFactory)
public void receiveMessage(Message message) {
eventProcessingService.doSome(message).subscribe(); // return Mono<Void>
}
}
@Service
public class EventProcessingService {
public Mono<Void> doSome(Message message) {
//...
}
}
@Configuration
@RequiredArgsConstructor
public class MqIntegration {
private final ConnectionFactory connectionFactory;
@Bean
public Publisher<Message<String>> mqReactiveFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
.destination("testQueue"))
.channel(MessageChannels.queue())
.toReactivePublisher();
}
}
我有一些与 ibm mq 交互的 webflux 应用程序和一个 JmsListener,它在收到消息时侦听队列中的消息 EventProcessingService 根据消息向其他服务发出请求。 我想知道如何使用 Spring 集成创建一个与反应线程一起工作的 JmsListener。换句话说,我想知道是否可以创建一个集成流,该流将从队列中接收消息并在接收到消息时调用 EvenProcessingService,这样它就不会对 webflux application[=12 中的线程产生负面影响=]
我认为您将不得不绕过 @JmsListener
,因为它正在注册一个 on 消息,尽管异步不会是反应性的。 JMS 本质上是阻塞的,因此在顶部修补一个反应层将只是一个补丁。
您将需要使用您创建的 Publisher
来生成背压。我认为您将不得不定义和实例化您自己的侦听器 bean,它执行以下操作:
public Flux<String> mqReactiveListener() {
return Flux.from(mqReactiveFlow())
.map(Message::getPayload);
}
我认为我们需要澄清您问题中的一些要点。
- WebFlux 本身并不是一个项目。它是 Spring 反应式服务器之上关于 Web 的框架模块:https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#spring-webflux
@JmsListener
是另一个 Spring 框架模块的一部分 -spring-jms
。并且与反应式服务器用于 WebFlux 层的线程无关。 https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms- Spring 集成是一个单独的项目,它在 Spring 框架依赖注入容器之上实现 EIP。它确实有自己的 WebFlux 模块,用于在 Spring 框架中的 WebFlux API 之上的通道适配器:https://docs.spring.io/spring-integration/docs/current/reference/html/webflux.html#webflux. And it also has a JMS module on top of JMS module from Spring Framework: https://docs.spring.io/spring-integration/docs/current/reference/html/jms.html#jms。然而,与
@JmsLisntener
没有任何关系,因为它的Jms.messageDrivenChannelAdapter()
完全涵盖了该功能,并且从一个很大的高度来看它以相同的方式实现它 - 通过MessageListenerContainer
.
所有这些可能与问题无关,但最好清楚地了解您所问的内容,这样我们会觉得我们与您在同一页面上。
现在正在尝试回答您的问题。
只要您不从 WebFlux 层(@RequestMapping
或 WebFlux.inboundGateway()
)处理 JMS,就不会影响那些非阻塞线程。 JMS MessageListenerContainer
生成自己的线程并执行从队列中拉取和消息处理。
您对 JMS 配置和服务的解释看起来更像这样:
@Bean
public IntegrationFlow mqReactiveFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
.destination("testQueue"))
.handle(this.eventProcessingService)
.nullChannel();
}
确实没有理由将 JMS 之后的消息转移到 QueueChannel
,因为 JMS 监听已经是一个异步操作。
我们需要在您的流程结束时 nullChannel
只是因为您的服务方法 returns Mono
和框架不知道如何处理它。从版本 5.4.3
开始, NullChannel
能够订阅生成给它的消息的 Publisher
负载。
您可以在两者之间设置一个 FluxMessageChannel
来真正模拟 JMS 侦听器的背压,但这不会对您的下一个服务产生太大影响。