ConcurrentConsumers 不是使用 DefaultMessageListenerContainer 创建的,而是使用 PooledConnectionFactory 上的 maximumActiveSessionsPerConnection 创建的

ConcurrentConsumers not created using DefaultMessageListenerContainer but using maximumActiveSessionsPerConnection on PooledConnectionFactory

我关注 Java class。当与 CachingConnectionFactory 一起使用时,它会在 DefaultMessageListenerContainer 上创建 ConcurrentConsumers 的配置数量。但是,如果使用 PooledConnectionFactory 而不是 CachingConnectionFactory,它只会创建 concurrentConsumers 等于 maximumActiveSessionPerConnection set on PooledConnectionFactory 而不是 concurrentConsumers set on DefaultMessageListenerContainer.

如何确保 DefaultMessageListenerContainer 使用 PooledConnectionFactory 提供的多个 connections/Sessions 并创建提供给 DefaultMessageListenerContainer 的配置数量的 concurrentConsumer。下面是检查相同的简单示例。

    import javax.jms.Session;

    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.jms.pool.PooledConnectionFactory;
    import org.springframework.jms.listener.DefaultMessageListenerContainer;

    public class ActiveMQMainTest {

        public static void main(String[] args) {
            String queueUrl = "tcp://localhost:61616";
            ActiveMQQueue queue = new ActiveMQQueue("request.queue");

            final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(queueUrl);

            PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
            pooledConnectionFactory.setConnectionFactory(connectionFactory);
            pooledConnectionFactory.setCreateConnectionOnStartup(false);
            pooledConnectionFactory.setMaxConnections(5);
            pooledConnectionFactory.setMaximumActiveSessionPerConnection(100);
            pooledConnectionFactory.start();

            // CachingConnectionFactory pooledConnectionFactory = new CachingConnectionFactory(connectionFactory);

            DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
            defaultMessageListenerContainer.setConnectionFactory(pooledConnectionFactory);
            defaultMessageListenerContainer.setDestination(queue);
            defaultMessageListenerContainer.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
            defaultMessageListenerContainer.setConcurrentConsumers(5);
            defaultMessageListenerContainer.setMaxConcurrentConsumers(5 * 2);
            defaultMessageListenerContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE);
            defaultMessageListenerContainer.setSessionTransacted(true);
            JmsMessageListener messageListener = new JmsMessageListener();
            defaultMessageListenerContainer.setMessageListener(messageListener);
            defaultMessageListenerContainer.afterPropertiesSet();
            defaultMessageListenerContainer.start();


            try {
                Thread.sleep(1000 * 60 * 10);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

DMLC 默认使用共享连接(当没有事务管理器时)。可以使用以下方式禁用它:

dmlc.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE);

您通常还应该 setSessionTransacted(true) 使用 DMLC,以避免丢失消息的可能性(使用 DMLC,消息在调用侦听器之前被确认),使用本地事务,确认成功直到监听器正常退出才去找broker