迁移监听 Spring 与 Spring 引导集成的 JMS 事件的正确最终方法

Proper ultimate way to migrate JMS event listening to Spring Integration with Spring Boot

我有一个 JmsConfig 配置 class 以下列方式处理来自主题的 JMS 事件:

现在我想转到 Spring 集成。阅读了大量内容后,如果我不需要双向使用(丢弃 网关 ),也不需要轮询机制(丢弃 @InboundChannelAdapter ),我打算用传统的 XML 配置措辞 message-driven-channel-adapter。我发现 Java 习语应该通过新的 Spring Integration DSL 库来完成,因此,我寻找合适的片段。

似乎 JmsMessageDrivenChannelAdapter 是正确的等价物,我找到了一个方法:

IntegrationFlows.from(Jms.messageDriverChannelAdapter(...))

但问题是这只接受 ActiveMQ ConnectionFactory 或 AbstractMessageListenerContainer,但没有我的引导预配置 JmsListenerContainerFactory !

最终应该如何实现?

JmsListenerContainerFactory 特定于 @JmsListener,它是用于配置 DefaultMessageListenerContainer 的更高级别的抽象。 Boot 没有为原始 DefaultMessageListenerContainer 提供自动配置选项;你必须自己接线。但是您仍然可以使用引导属性...

@Bean
public IntegrationFlow flow(ConnectionFactory connectionFactory, 
                            JmsProperties properties) {
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(container(connectionFactory, properties)))
            ...
            .get();
}

private DefaultMessageListenerContainer container(ConnectionFactory connectionFactory, 
                                                  JmsProperties properties) {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    container.setConcurrentConsumers(properties.getListener().getConcurrency());
    container.setMaxConcurrentConsumers(properties.getListener().getMaxConcurrency());
    ...
    return container;
}

还有更好的方法。我很惊讶加里没有评论它。 有一个开箱即用的构建器,名为 Jms.container(...)

@Bean
public IntegrationFlow jmsMyServiceMsgInboundFlow(ConnectionFactory connectionFactory, MessageConverter jmsMessageConverter, MyService myService, JmsProperties jmsProperties, @Value("${mycompany.jms.destination.my-topic}") String topicDestination){

      JmsProperties.Listener jmsInProps = jmsProperties.getListener();

      return IntegrationFlows.from(
                                Jms.messageDrivenChannelAdapter(  Jms.container(connectionFactory, topicDestination)
                                                                     .pubSubDomain(false)
                                                                     .sessionAcknowledgeMode(jmsInProps .getAcknowledgeMode().getMode())
                                                                     .maxMessagesPerTask(1)
                                                                     .errorHandler(e -> e.printStackTrace())
                                                                     .cacheLevel(0)
                                                                     .concurrency(jmsInProps.formatConcurrency())
                                                                     .taskExecutor(Executors.newCachedThreadPool())
                                                                     .get()))
                                   )
                                   .extractPayload(true)
                                   .jmsMessageConverter(jmsMessageConverter)
                                   .destination(topicDestination)
                                   .autoStartup(true)
                                    //.errorChannel("NOPE")
                             )
                             .log(LoggingHandler.Level.DEBUG)
                             .log()
                             .handle(myService, "myMethod", e -> e.async(true).advice(retryAdvice()))
                             .get();