Spring 集成:如何将 SingleConnectionFactory 与 ActiveMQ 一起使用?
Spring Integration: How to use SingleConnectionFactory with ActiveMQ?
Spring 启动 2.3.1.RELEASE.
使用 spring.jms.cache.enabled=true
(默认),Spring 创建一个 CachingConnectionFactory
:
@ConditionalOnProperty(prefix = "spring.jms.cache", name = "enabled", havingValue = "true",
matchIfMissing = true)
static class CachingConnectionFactoryConfiguration {
这很糟糕,因为 it shouldn't be used with DefaultMessageListenerContainer
。我认为这就是为什么我的某些消息会“丢失”直到它们突然重新出现的原因。
使用 spring.jms.cache.enabled=false
,Spring 创建一个 ActiveMQConnectionFactory
:
@ConditionalOnProperty(prefix = "spring.jms.cache", name = "enabled", havingValue = "false")
ActiveMQConnectionFactory jmsConnectionFactory(ActiveMQProperties properties,
ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
return createJmsConnectionFactory(properties, factoryCustomizers);
}
private static ActiveMQConnectionFactory createJmsConnectionFactory(ActiveMQProperties properties,
这很糟糕,因为在每次轮询时,它都会创建一个到 Broker 的新连接 - 用数百个连接淹没我的代理。
所以我虽然解决我的问题是使用 SingleConnectionFactory
。在 AbstractPollingMessageListenerContainer.MessageListenerContainerResourceFactory
我看到:
public Connection createConnection() throws JMSException {
if (AbstractPollingMessageListenerContainer.this.sharedConnectionEnabled()) {
Connection sharedCon = AbstractPollingMessageListenerContainer.this.getSharedConnection();
return new SingleConnectionFactory(sharedCon).createConnection();
}
所以我想我会:
Jms.channel(connectionFactory)
.cacheLevel(DefaultMessageListenerContainer.CACHE_CONNECTION)
但事实证明,此方法从未被调用,只有 JmsAccessor.createConnection()
创建了一个 ActiveMQConnectionFactory
。我的缓存级别没有影响。
那么如何正确使用SingleConnectionFactory
?
如果您具有可变并发性,缓存工厂只是 DMLC 的一个问题。
只需将 SingleConnctionFactory
定义为 @Bean
并使用 Jms.channel(mySingleCF())...
.
编辑
@SpringBootApplication
public class So63120705Application {
public static void main(String[] args) {
SpringApplication.run(So63120705Application.class, args).close(); // JVM should exit
}
@Bean
public ApplicationRunner runner(ConnectionFactory jmsConnectionFactory, IntegrationFlowContext context,
JmsTemplate template) {
return args -> {
SingleConnectionFactory connectionFactory = new SingleConnectionFactory(jmsConnectionFactory);
IntegrationFlow flow = f -> f.channel(Jms.channel(connectionFactory)
.destination("foo"))
.handle(System.out::println);
context.registration(flow)
.id("jms")
.addBean(connectionFactory)
.register();
template.convertAndSend("foo", "test");
Thread.sleep(5_000);
};
}
}
spring.jms.cache.enabled=false
Spring 启动 2.3.1.RELEASE.
使用 spring.jms.cache.enabled=true
(默认),Spring 创建一个 CachingConnectionFactory
:
@ConditionalOnProperty(prefix = "spring.jms.cache", name = "enabled", havingValue = "true",
matchIfMissing = true)
static class CachingConnectionFactoryConfiguration {
这很糟糕,因为 it shouldn't be used with DefaultMessageListenerContainer
。我认为这就是为什么我的某些消息会“丢失”直到它们突然重新出现的原因。
使用 spring.jms.cache.enabled=false
,Spring 创建一个 ActiveMQConnectionFactory
:
@ConditionalOnProperty(prefix = "spring.jms.cache", name = "enabled", havingValue = "false")
ActiveMQConnectionFactory jmsConnectionFactory(ActiveMQProperties properties,
ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
return createJmsConnectionFactory(properties, factoryCustomizers);
}
private static ActiveMQConnectionFactory createJmsConnectionFactory(ActiveMQProperties properties,
这很糟糕,因为在每次轮询时,它都会创建一个到 Broker 的新连接 - 用数百个连接淹没我的代理。
所以我虽然解决我的问题是使用 SingleConnectionFactory
。在 AbstractPollingMessageListenerContainer.MessageListenerContainerResourceFactory
我看到:
public Connection createConnection() throws JMSException {
if (AbstractPollingMessageListenerContainer.this.sharedConnectionEnabled()) {
Connection sharedCon = AbstractPollingMessageListenerContainer.this.getSharedConnection();
return new SingleConnectionFactory(sharedCon).createConnection();
}
所以我想我会:
Jms.channel(connectionFactory)
.cacheLevel(DefaultMessageListenerContainer.CACHE_CONNECTION)
但事实证明,此方法从未被调用,只有 JmsAccessor.createConnection()
创建了一个 ActiveMQConnectionFactory
。我的缓存级别没有影响。
那么如何正确使用SingleConnectionFactory
?
如果您具有可变并发性,缓存工厂只是 DMLC 的一个问题。
只需将 SingleConnctionFactory
定义为 @Bean
并使用 Jms.channel(mySingleCF())...
.
编辑
@SpringBootApplication
public class So63120705Application {
public static void main(String[] args) {
SpringApplication.run(So63120705Application.class, args).close(); // JVM should exit
}
@Bean
public ApplicationRunner runner(ConnectionFactory jmsConnectionFactory, IntegrationFlowContext context,
JmsTemplate template) {
return args -> {
SingleConnectionFactory connectionFactory = new SingleConnectionFactory(jmsConnectionFactory);
IntegrationFlow flow = f -> f.channel(Jms.channel(connectionFactory)
.destination("foo"))
.handle(System.out::println);
context.registration(flow)
.id("jms")
.addBean(connectionFactory)
.register();
template.convertAndSend("foo", "test");
Thread.sleep(5_000);
};
}
}
spring.jms.cache.enabled=false