Spring 集成和 IBM MQ
Spring Integration and IBM MQ
帮助我尝试连接 Spring Integration Ibm mq,我在做什么:
public Publisher<Message<String>> jmsReactiveSource() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(ibmConnectionFactory)
.destination("DEV.QUEUE.1"))
.channel(MessageChannels.queue())
.log(org.springframework.integration.handler.LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher();
}
@GetMapping("/event")
public Mono<String> getEvent() {
return Mono.from(jmsReactiveSource())
.log()
.map (Message::getPayload);
}
@GetMapping("/pub")
public void produce() {
jmsTemplateIbm.convertAndSend("DEV.QUEUE.1", "MESSAGE");
}
当我在调用 /pub 之后调用 /event 时没有任何反应,消息没有被扣除,我做错了什么,我需要让 ibm mq 成为非阻塞的,因为大量的资源都花在了等待响应上来自 IBM MQ。
请参阅 Spring 关于 Java DSL 的集成文档:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl。我们在示例中无处不在谈论将 IntegrationFlow
作为 @Bean
。只有这样 IntegrationFlowBeanPostProcessor
才能理解 DSL 定义并在应用程序上下文中注册相应的 bean。带有那些 @GetMapping
方法的整个 class 和 jmsReactiveSource()
的这个 @Bean
必须作为一个 @Configuration
来调用 jmsReactiveSource()
方法作为来自应用程序上下文的 bean 请求。否则,如果您不想让这个 @RestController
成为这样的一个,您应该考虑将这个流 bean 移到单独的 @Configuration
class 中。然后你需要将那个 Publisher<Message<String>>
bean 自动装配到这个控制器 class.
更新
关于提早消费的第二个问题,您必须将 Jms.messageDrivenChannelAdapter()
标记为 autoStartup(false)
并从 getEvent()
中的 Mono
使用它的 doOnRequest()
。适配器上有一个 id()
选项供您考虑正确自动连接该端点。你是对的:即使它最终是反应性的,但开始时不是并且必须分别处理以在请求之前不消耗。
您也可以考虑 return Flux
而不是 Mono
:整个过程被认为是无界的。这样的REST请求也应该改为SSE:https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-codecs-streaming
帮助我尝试连接 Spring Integration Ibm mq,我在做什么:
public Publisher<Message<String>> jmsReactiveSource() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(ibmConnectionFactory)
.destination("DEV.QUEUE.1"))
.channel(MessageChannels.queue())
.log(org.springframework.integration.handler.LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher();
}
@GetMapping("/event")
public Mono<String> getEvent() {
return Mono.from(jmsReactiveSource())
.log()
.map (Message::getPayload);
}
@GetMapping("/pub")
public void produce() {
jmsTemplateIbm.convertAndSend("DEV.QUEUE.1", "MESSAGE");
}
当我在调用 /pub 之后调用 /event 时没有任何反应,消息没有被扣除,我做错了什么,我需要让 ibm mq 成为非阻塞的,因为大量的资源都花在了等待响应上来自 IBM MQ。
请参阅 Spring 关于 Java DSL 的集成文档:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl。我们在示例中无处不在谈论将 IntegrationFlow
作为 @Bean
。只有这样 IntegrationFlowBeanPostProcessor
才能理解 DSL 定义并在应用程序上下文中注册相应的 bean。带有那些 @GetMapping
方法的整个 class 和 jmsReactiveSource()
的这个 @Bean
必须作为一个 @Configuration
来调用 jmsReactiveSource()
方法作为来自应用程序上下文的 bean 请求。否则,如果您不想让这个 @RestController
成为这样的一个,您应该考虑将这个流 bean 移到单独的 @Configuration
class 中。然后你需要将那个 Publisher<Message<String>>
bean 自动装配到这个控制器 class.
更新
关于提早消费的第二个问题,您必须将 Jms.messageDrivenChannelAdapter()
标记为 autoStartup(false)
并从 getEvent()
中的 Mono
使用它的 doOnRequest()
。适配器上有一个 id()
选项供您考虑正确自动连接该端点。你是对的:即使它最终是反应性的,但开始时不是并且必须分别处理以在请求之前不消耗。
您也可以考虑 return Flux
而不是 Mono
:整个过程被认为是无界的。这样的REST请求也应该改为SSE:https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-codecs-streaming