无法使用测试请求的消息

Can't consume message that is enqeued by test

我在编写将消息发布到点对点队列的基本测试时遇到问题。

使用 @JmsListener bean 时,会消耗消息。 当不使用 @JmsListener 并在测试 class 中使用通过 @Autowired JmsTemplateconnectionFactory 获得的消费者时,消息不会被消费。

我添加了一些日志记录和调试输出,但不明白为什么我不能在测试中使用消息 class 但 @JmsListener bean 可以。

@SpringBootTest
@ActiveProfiles("tc")
@Log4j2
public class SessionActiveMQIT {

    @Autowired
    public JmsTemplate jmsTemplate;

    @Test
    void canEnqueueAndPersistClientAck() throws JMSException, InterruptedException {

        final ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST_QUEUE");


        jmsTemplate.setDeliveryPersistent(true);
        jmsTemplate.setSessionAcknowledgeMode(JmsProperties.AcknowledgeMode.CLIENT.getMode());
        jmsTemplate.setSessionTransacted(true);
        jmsTemplate.setDefaultDestination(activeMQQueue);
        jmsTemplate.setPubSubDomain(false);
        jmsTemplate.setPubSubNoLocal(false);

        final ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
        activeMQTextMessage.setText("MESSAGE");
        activeMQTextMessage.setPersistent(true);

        jmsTemplate.execute("TEST_QUEUE", ((session, messageProducer) -> {
            try {
                log.info("Sending to Queue.");
                messageProducer.send(activeMQTextMessage, DeliveryMode.PERSISTENT, 4, 30000);
                session.commit();
                session.close();
                log.info("Committed and Closed.");
            } catch (Exception e) {
                e.printStackTrace();
                log.error(e.getMessage());
                session.rollback();
                session.close();
            }
            return session;
        }));

        log.info("Create session from conn factory.");
        final Session session = jmsTemplate.getConnectionFactory().createConnection().createSession();

        log.info("Consumer creation.");
        final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(activeMQQueue);
        
        log.info("Consume Message");

        log.info(consumer.receive(100L));
    }

}

日志输出:


02 Mar 2021 16:48:34,298 [ INFO] --- o.a.a.b.BrokerService          : Using Persistence Adapter: MemoryPersistenceAdapter
02 Mar 2021 16:48:34,438 [ INFO] --- o.a.a.b.BrokerService          : Apache ActiveMQ 5.16.1 (localhost, ID:devbox-44103-1614703714311-0:1) is starting
02 Mar 2021 16:48:34,442 [DEBUG] --- o.a.a.b.j.Log4JConfigView      : Could not locate log4j classes on classpath.
02 Mar 2021 16:48:34,442 [ INFO] --- o.a.a.b.BrokerService          : Apache ActiveMQ 5.16.1 (localhost, ID:devbox-44103-1614703714311-0:1) started
02 Mar 2021 16:48:34,442 [ INFO] --- o.a.a.b.BrokerService          : For help or more information please see: http://activemq.apache.org
02 Mar 2021 16:48:34,445 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding destination: topic://ActiveMQ.Advisory.MasterBroker
02 Mar 2021 16:48:34,452 [DEBUG] --- o.a.a.t.TaskRunnerFactory      : Initialized TaskRunnerFactory[ActiveMQ BrokerService[localhost] Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@55bf08a5[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
02 Mar 2021 16:48:34,456 [DEBUG] --- o.a.a.t.v.VMTransportFactory   : binding to broker: localhost
02 Mar 2021 16:48:34,459 [ INFO] --- o.a.a.b.TransportConnector     : Connector vm://localhost started
02 Mar 2021 16:48:34,463 [DEBUG] --- o.a.a.t.TaskRunnerFactory      : Initialized TaskRunnerFactory[ActiveMQ VMTransport: vm://localhost#0] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@297db6ad[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
02 Mar 2021 16:48:34,472 [DEBUG] --- o.a.a.t.TaskRunnerFactory      : Initialized TaskRunnerFactory[ActiveMQ VMTransport: vm://localhost#1] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@170437d4[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
02 Mar 2021 16:48:34,474 [DEBUG] --- o.a.a.b.TransportConnection    : Setting up new connection id: ID:devbox-44103-1614703714311-4:1, address: vm://localhost#0, info: ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:devbox-44103-1614703714311-4:1, clientId = ID:devbox-44103-1614703714311-3:1, clientIp = null, userName = admin, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true, faultTolerant = false, failoverReconnect = false}
02 Mar 2021 16:48:34,475 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,475 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,475 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding destination: topic://ActiveMQ.Advisory.Connection
02 Mar 2021 16:48:34,480 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding consumer: ID:devbox-44103-1614703714311-4:1:-1:1 for destination: ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic
02 Mar 2021 16:48:34,514 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding destination: queue://TEST_QUEUE
02 Mar 2021 16:48:34,529 [DEBUG] --- o.a.a.b.r.Queue                : queue://TEST_QUEUE, subscriptions=0, memory=0%, size=0, pending=0 toPageIn: 0, force:false, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 0, dequeueCount: 0, memUsage:0, maxPageSize:200
02 Mar 2021 16:48:34,530 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,530 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,530 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding destination: topic://ActiveMQ.Advisory.Queue
02 Mar 2021 16:48:34,532 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,532 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,532 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding destination: topic://ActiveMQ.Advisory.Producer.Queue.TEST_QUEUE
02 Mar 2021 16:48:34,534 [ INFO] --- m.a.w.c.SessionActiveMQIT      : Sending to Queue.
02 Mar 2021 16:48:34,535 [DEBUG] --- o.a.a.TransactionContext       : Begin:TX:ID:devbox-44103-1614703714311-4:1:1
02 Mar 2021 16:48:34,536 [DEBUG] --- o.a.a.ActiveMQSession          : ID:devbox-44103-1614703714311-4:1:1 Transaction Commit :TX:ID:devbox-44103-1614703714311-4:1:1
02 Mar 2021 16:48:34,536 [DEBUG] --- o.a.a.TransactionContext       : Commit: TX:ID:devbox-44103-1614703714311-4:1:1 syncCount: 0
02 Mar 2021 16:48:34,539 [DEBUG] --- o.a.a.t.LocalTransaction       : commit: TX:ID:devbox-44103-1614703714311-4:1:1 syncCount: 1
02 Mar 2021 16:48:34,540 [DEBUG] --- o.a.a.b.r.Queue                : localhost Message ID:devbox-44103-1614703714311-4:1:1:1:1 sent to queue://TEST_QUEUE
02 Mar 2021 16:48:34,541 [ INFO] --- m.a.w.c.SessionActiveMQIT      : Committed and Closed.
02 Mar 2021 16:48:34,541 [DEBUG] --- o.a.a.b.r.Queue                : queue://TEST_QUEUE, subscriptions=0, memory=0%, size=1, pending=0 toPageIn: 1, force:false, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:1038, maxPageSize:200
02 Mar 2021 16:48:34,545 [ INFO] --- m.a.w.c.SessionActiveMQIT      : Create session from conn factory.
02 Mar 2021 16:48:34,545 [DEBUG] --- o.a.a.b.j.ManagementContext    : Unregistering MBean org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST_QUEUE,endpoint=Producer,clientId=ID_devbox-44103-1614703714311-3_1,producerId=ID_devbox-44103-1614703714311-4_1_1_1
02 Mar 2021 16:48:34,546 [ INFO] --- m.a.w.c.SessionActiveMQIT      : Consumer creation.
02 Mar 2021 16:48:34,546 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,546 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,552 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding consumer: ID:devbox-44103-1614703714311-4:1:2:1 for destination: queue://TEST_QUEUE
02 Mar 2021 16:48:34,558 [DEBUG] --- o.a.a.b.r.Queue                : queue://TEST_QUEUE add sub: QueueSubscription: consumer=ID:devbox-44103-1614703714311-4:1:2:1, destinations=0, dispatched=0, delivered=0, pending=0, prefetch=1000, prefetchExtension=0, dequeues: 0, dispatched: 0, inflight: 0
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.r.Queue                : queue://TEST_QUEUE, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 1, force:false, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:1038, maxPageSize:200
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding destination: topic://ActiveMQ.Advisory.Consumer.Queue.TEST_QUEUE
02 Mar 2021 16:48:34,562 [ INFO] --- m.a.w.c.SessionActiveMQIT      : Consume Message
02 Mar 2021 16:48:34,662 [ INFO] --- m.a.w.c.SessionActiveMQIT      : null

我认为您需要在 javax.jms.Connection 的实例上调用 start() 才能让消息流向消费者,例如:

final Connection connection = jmsTemplate.getConnectionFactory().createConnection();
final Session session = connection.createSession();
connection.start()

此外,请务必在使用完资源(即连接、会话、使用者)后关闭它们。目前它们刚刚超出范围,这意味着它们正在被泄露。我知道这只是一个测试,但这仍然是一个很好的练习。