DefaultJmsListenerContainerFactory 和并发连接未关闭
DefaultJmsListenerContainerFactory and Concurrent Connections not shutting down
我正在使用 Spring 4.x 的 DefaultJmsListenerContainerFactory 以连接到 ActiveMQ 队列,使用 @JmsListener 处理来自该队列的消息,然后将消息推送到相同的 ActiveMQ 代理。
我为 consumer/listener 和生产者使用一个缓存连接工厂,并将缓存消费者设置为 false,这样我可以缓存生产者,但不能缓存消费者。我还将并发设置为 1-3,我希望在应用程序启动时队列中至少有 1 个消费者,并且随着消息的增加,消费者的数量将达到 3。但是,随着消息减少,我原以为消费者的数量也会回落到 1。但是,如果我看一下线程 (defaultmessagelistenercontainer-2/3),它们处于等待状态,并且不会关闭。当负载消退时,消费者的数量也将关闭,这不是预期的行为吗?请查看下面我的配置,让我知道这种行为是否不是开箱即用的,以及我是否需要添加一些东西来使它像我上面列出的那样工作。
ApplicationContext.java
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency(environment.getProperty("jms.connections.concurrent"));
factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class));
return factory;
}
@Bean
public CachingConnectionFactory connectionFactory(){
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(environment.getProperty("jms.redelivery.initial-delay", Long.class));
redeliveryPolicy.setRedeliveryDelay(environment.getProperty("jms.redelivery.delay", Long.class));
redeliveryPolicy.setMaximumRedeliveries(environment.getProperty("jms.redelivery.maximum", Integer.class));
redeliveryPolicy.setUseExponentialBackOff(environment.getProperty("jms.redelivery.use-exponential-back-off", Boolean.class));
redeliveryPolicy.setBackOffMultiplier(environment.getProperty("jms.redelivery.back-off-multiplier", Double.class));
ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("jms.queue.username"), environment.getProperty("jms.queue.password"), environment.getProperty("jms.broker.endpoint"));
activeMQ.setRedeliveryPolicy(redeliveryPolicy);
activeMQ.setPrefetchPolicy(prefetchPolicy());
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ);
cachingConnectionFactory.setCacheConsumers(environment.getProperty("jms.connections.cache.consumers", Boolean.class));
cachingConnectionFactory.setSessionCacheSize(environment.getProperty("jms.cache.size", Integer.class));
return cachingConnectionFactory;
}
@Bean
public JmsMessagingTemplate jmsMessagingTemplate(){
ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("jms.queue.out"));
JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory());
jmsMessagingTemplate.setDefaultDestination(activeMQ);
return jmsMessagingTemplate;
}
application.properties
jms.connections.concurrent=1-3
jms.connections.prefetch=1000
jms.connections.transacted=true
jms.connections.cache.consumers=false
jms.redelivery.initial-delay=1000
jms.redelivery.delay=1000
jms.redelivery.maximum=5
jms.redelivery.use-exponential-back-off=true
jms.redelivery.back-off-multiplier=2
jms.cache.size=3
jms.queue.in=in.queue
jms.queue.out=out.queue
jms.broker.endpoint=failover:(tcp://localhost:61616)
尝试设置 maxMessagesPerTask > 0
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMaxMessagesPerTask(1);
factory.setConcurrency(environment.getProperty("jms.connections.concurrent"));
factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class));
return factory;
}
jms.connections.prefetch=1000
意味着如果您有 1000 条消息在 Q 上等待,您将只有 1 个线程开始处理这 1000 条消息。
例如 jms.connections.prefetch=1
意味着消息将被平均分派给所有可用的线程,但是最好设置 maxMessagesPerTask < 0
因为长期任务避免频繁的线程上下文切换。
http://activemq.apache.org/what-is-the-prefetch-limit-for.html
我正在使用 Spring 4.x 的 DefaultJmsListenerContainerFactory 以连接到 ActiveMQ 队列,使用 @JmsListener 处理来自该队列的消息,然后将消息推送到相同的 ActiveMQ 代理。
我为 consumer/listener 和生产者使用一个缓存连接工厂,并将缓存消费者设置为 false,这样我可以缓存生产者,但不能缓存消费者。我还将并发设置为 1-3,我希望在应用程序启动时队列中至少有 1 个消费者,并且随着消息的增加,消费者的数量将达到 3。但是,随着消息减少,我原以为消费者的数量也会回落到 1。但是,如果我看一下线程 (defaultmessagelistenercontainer-2/3),它们处于等待状态,并且不会关闭。当负载消退时,消费者的数量也将关闭,这不是预期的行为吗?请查看下面我的配置,让我知道这种行为是否不是开箱即用的,以及我是否需要添加一些东西来使它像我上面列出的那样工作。
ApplicationContext.java
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency(environment.getProperty("jms.connections.concurrent"));
factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class));
return factory;
}
@Bean
public CachingConnectionFactory connectionFactory(){
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(environment.getProperty("jms.redelivery.initial-delay", Long.class));
redeliveryPolicy.setRedeliveryDelay(environment.getProperty("jms.redelivery.delay", Long.class));
redeliveryPolicy.setMaximumRedeliveries(environment.getProperty("jms.redelivery.maximum", Integer.class));
redeliveryPolicy.setUseExponentialBackOff(environment.getProperty("jms.redelivery.use-exponential-back-off", Boolean.class));
redeliveryPolicy.setBackOffMultiplier(environment.getProperty("jms.redelivery.back-off-multiplier", Double.class));
ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("jms.queue.username"), environment.getProperty("jms.queue.password"), environment.getProperty("jms.broker.endpoint"));
activeMQ.setRedeliveryPolicy(redeliveryPolicy);
activeMQ.setPrefetchPolicy(prefetchPolicy());
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ);
cachingConnectionFactory.setCacheConsumers(environment.getProperty("jms.connections.cache.consumers", Boolean.class));
cachingConnectionFactory.setSessionCacheSize(environment.getProperty("jms.cache.size", Integer.class));
return cachingConnectionFactory;
}
@Bean
public JmsMessagingTemplate jmsMessagingTemplate(){
ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("jms.queue.out"));
JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory());
jmsMessagingTemplate.setDefaultDestination(activeMQ);
return jmsMessagingTemplate;
}
application.properties
jms.connections.concurrent=1-3
jms.connections.prefetch=1000
jms.connections.transacted=true
jms.connections.cache.consumers=false
jms.redelivery.initial-delay=1000
jms.redelivery.delay=1000
jms.redelivery.maximum=5
jms.redelivery.use-exponential-back-off=true
jms.redelivery.back-off-multiplier=2
jms.cache.size=3
jms.queue.in=in.queue
jms.queue.out=out.queue
jms.broker.endpoint=failover:(tcp://localhost:61616)
尝试设置 maxMessagesPerTask > 0
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMaxMessagesPerTask(1);
factory.setConcurrency(environment.getProperty("jms.connections.concurrent"));
factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class));
return factory;
}
jms.connections.prefetch=1000
意味着如果您有 1000 条消息在 Q 上等待,您将只有 1 个线程开始处理这 1000 条消息。
例如 jms.connections.prefetch=1
意味着消息将被平均分派给所有可用的线程,但是最好设置 maxMessagesPerTask < 0
因为长期任务避免频繁的线程上下文切换。
http://activemq.apache.org/what-is-the-prefetch-limit-for.html