迁移监听 Spring 与 Spring 引导集成的 JMS 事件的正确最终方法
Proper ultimate way to migrate JMS event listening to Spring Integration with Spring Boot
我有一个 JmsConfig
配置 class 以下列方式处理来自主题的 JMS 事件:
- 它定义了一个
@Bean ConnectionFactory
,包含一个ActiveMQ实现
- 它定义了一个
@Bean JmsListenerContainerFactory
实例化一个 DefaultJmsListenerContainerFactory
并通过 Boot 的 DefaultJmsListenerContainerFactoryConfigurer
- 它定义了一个
@Bean MessageConverter
包含一个 MappingJackson2MessageConverter
并设置了一个自定义 ObjectMapper
- 我在我的服务方法上使用指向 myfactory 的
@JmsListener
注释。这是我对该主题的唯一用途,单独订阅。
现在我想转到 Spring 集成。阅读了大量内容后,如果我不需要双向使用(丢弃 网关 ),也不需要轮询机制(丢弃 @InboundChannelAdapter
),我打算用传统的 XML 配置措辞 message-driven-channel-adapter
。我发现 Java 习语应该通过新的 Spring Integration DSL 库来完成,因此,我寻找合适的片段。
似乎 JmsMessageDrivenChannelAdapter
是正确的等价物,我找到了一个方法:
IntegrationFlows.from(Jms.messageDriverChannelAdapter(...))
但问题是这只接受 ActiveMQ ConnectionFactory 或 AbstractMessageListenerContainer
,但没有我的引导预配置 JmsListenerContainerFactory
!
最终应该如何实现?
JmsListenerContainerFactory
特定于 @JmsListener
,它是用于配置 DefaultMessageListenerContainer
的更高级别的抽象。 Boot 没有为原始 DefaultMessageListenerContainer
提供自动配置选项;你必须自己接线。但是您仍然可以使用引导属性...
@Bean
public IntegrationFlow flow(ConnectionFactory connectionFactory,
JmsProperties properties) {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(container(connectionFactory, properties)))
...
.get();
}
private DefaultMessageListenerContainer container(ConnectionFactory connectionFactory,
JmsProperties properties) {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConcurrentConsumers(properties.getListener().getConcurrency());
container.setMaxConcurrentConsumers(properties.getListener().getMaxConcurrency());
...
return container;
}
还有更好的方法。我很惊讶加里没有评论它。
有一个开箱即用的构建器,名为 Jms.container(...)
。
@Bean
public IntegrationFlow jmsMyServiceMsgInboundFlow(ConnectionFactory connectionFactory, MessageConverter jmsMessageConverter, MyService myService, JmsProperties jmsProperties, @Value("${mycompany.jms.destination.my-topic}") String topicDestination){
JmsProperties.Listener jmsInProps = jmsProperties.getListener();
return IntegrationFlows.from(
Jms.messageDrivenChannelAdapter( Jms.container(connectionFactory, topicDestination)
.pubSubDomain(false)
.sessionAcknowledgeMode(jmsInProps .getAcknowledgeMode().getMode())
.maxMessagesPerTask(1)
.errorHandler(e -> e.printStackTrace())
.cacheLevel(0)
.concurrency(jmsInProps.formatConcurrency())
.taskExecutor(Executors.newCachedThreadPool())
.get()))
)
.extractPayload(true)
.jmsMessageConverter(jmsMessageConverter)
.destination(topicDestination)
.autoStartup(true)
//.errorChannel("NOPE")
)
.log(LoggingHandler.Level.DEBUG)
.log()
.handle(myService, "myMethod", e -> e.async(true).advice(retryAdvice()))
.get();
我有一个 JmsConfig
配置 class 以下列方式处理来自主题的 JMS 事件:
- 它定义了一个
@Bean ConnectionFactory
,包含一个ActiveMQ实现 - 它定义了一个
@Bean JmsListenerContainerFactory
实例化一个DefaultJmsListenerContainerFactory
并通过 Boot 的DefaultJmsListenerContainerFactoryConfigurer
- 它定义了一个
@Bean MessageConverter
包含一个MappingJackson2MessageConverter
并设置了一个自定义ObjectMapper
- 我在我的服务方法上使用指向 myfactory 的
@JmsListener
注释。这是我对该主题的唯一用途,单独订阅。
现在我想转到 Spring 集成。阅读了大量内容后,如果我不需要双向使用(丢弃 网关 ),也不需要轮询机制(丢弃 @InboundChannelAdapter
),我打算用传统的 XML 配置措辞 message-driven-channel-adapter
。我发现 Java 习语应该通过新的 Spring Integration DSL 库来完成,因此,我寻找合适的片段。
似乎 JmsMessageDrivenChannelAdapter
是正确的等价物,我找到了一个方法:
IntegrationFlows.from(Jms.messageDriverChannelAdapter(...))
但问题是这只接受 ActiveMQ ConnectionFactory 或 AbstractMessageListenerContainer
,但没有我的引导预配置 JmsListenerContainerFactory
!
最终应该如何实现?
JmsListenerContainerFactory
特定于 @JmsListener
,它是用于配置 DefaultMessageListenerContainer
的更高级别的抽象。 Boot 没有为原始 DefaultMessageListenerContainer
提供自动配置选项;你必须自己接线。但是您仍然可以使用引导属性...
@Bean
public IntegrationFlow flow(ConnectionFactory connectionFactory,
JmsProperties properties) {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(container(connectionFactory, properties)))
...
.get();
}
private DefaultMessageListenerContainer container(ConnectionFactory connectionFactory,
JmsProperties properties) {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConcurrentConsumers(properties.getListener().getConcurrency());
container.setMaxConcurrentConsumers(properties.getListener().getMaxConcurrency());
...
return container;
}
还有更好的方法。我很惊讶加里没有评论它。
有一个开箱即用的构建器,名为 Jms.container(...)
。
@Bean
public IntegrationFlow jmsMyServiceMsgInboundFlow(ConnectionFactory connectionFactory, MessageConverter jmsMessageConverter, MyService myService, JmsProperties jmsProperties, @Value("${mycompany.jms.destination.my-topic}") String topicDestination){
JmsProperties.Listener jmsInProps = jmsProperties.getListener();
return IntegrationFlows.from(
Jms.messageDrivenChannelAdapter( Jms.container(connectionFactory, topicDestination)
.pubSubDomain(false)
.sessionAcknowledgeMode(jmsInProps .getAcknowledgeMode().getMode())
.maxMessagesPerTask(1)
.errorHandler(e -> e.printStackTrace())
.cacheLevel(0)
.concurrency(jmsInProps.formatConcurrency())
.taskExecutor(Executors.newCachedThreadPool())
.get()))
)
.extractPayload(true)
.jmsMessageConverter(jmsMessageConverter)
.destination(topicDestination)
.autoStartup(true)
//.errorChannel("NOPE")
)
.log(LoggingHandler.Level.DEBUG)
.log()
.handle(myService, "myMethod", e -> e.async(true).advice(retryAdvice()))
.get();