Spring 集成:如何将 SingleConnectionFactory 与 ActiveMQ 一起使用?

Spring Integration: How to use SingleConnectionFactory with ActiveMQ?

Spring 启动 2.3.1.RELEASE.

使用 spring.jms.cache.enabled=true(默认),Spring 创建一个 CachingConnectionFactory:

    @ConditionalOnProperty(prefix = "spring.jms.cache", name = "enabled", havingValue = "true",
            matchIfMissing = true)
    static class CachingConnectionFactoryConfiguration {

这很糟糕,因为 it shouldn't be used with DefaultMessageListenerContainer。我认为这就是为什么我的某些消息会“丢失”直到它们突然重新出现的原因。

使用 spring.jms.cache.enabled=false,Spring 创建一个 ActiveMQConnectionFactory:

    @ConditionalOnProperty(prefix = "spring.jms.cache", name = "enabled", havingValue = "false")
    ActiveMQConnectionFactory jmsConnectionFactory(ActiveMQProperties properties,
            ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
        return createJmsConnectionFactory(properties, factoryCustomizers);
    }

    private static ActiveMQConnectionFactory createJmsConnectionFactory(ActiveMQProperties properties,

这很糟糕,因为在每次轮询时,它都会创建一个到 Broker 的新连接 - 用数百个连接淹没我的代理。

所以我虽然解决我的问题是使用 SingleConnectionFactory。在 AbstractPollingMessageListenerContainer.MessageListenerContainerResourceFactory 我看到:

    public Connection createConnection() throws JMSException {
        if (AbstractPollingMessageListenerContainer.this.sharedConnectionEnabled()) {
            Connection sharedCon = AbstractPollingMessageListenerContainer.this.getSharedConnection();
            return new SingleConnectionFactory(sharedCon).createConnection();
        }

所以我想我会:

Jms.channel(connectionFactory)
    .cacheLevel(DefaultMessageListenerContainer.CACHE_CONNECTION)

但事实证明,此方法从未被调用,只有 JmsAccessor.createConnection() 创建了一个 ActiveMQConnectionFactory。我的缓存级别没有影响。

那么如何正确使用SingleConnectionFactory

如果您具有可变并发性,缓存工厂只是 DMLC 的一个问题。

只需将 SingleConnctionFactory 定义为 @Bean 并使用 Jms.channel(mySingleCF())....

编辑

@SpringBootApplication
public class So63120705Application {

    public static void main(String[] args) {
        SpringApplication.run(So63120705Application.class, args).close(); // JVM should exit
    }

    @Bean
    public ApplicationRunner runner(ConnectionFactory jmsConnectionFactory, IntegrationFlowContext context,
            JmsTemplate template) {

        return args -> {
            SingleConnectionFactory connectionFactory = new SingleConnectionFactory(jmsConnectionFactory);

            IntegrationFlow flow = f -> f.channel(Jms.channel(connectionFactory)
                                .destination("foo"))
                        .handle(System.out::println);

            context.registration(flow)
                .id("jms")
                .addBean(connectionFactory)
                .register();

            template.convertAndSend("foo", "test");
            Thread.sleep(5_000);
        };
    }

}
spring.jms.cache.enabled=false