无法使用测试请求的消息
Can't consume message that is enqeued by test
我在编写将消息发布到点对点队列的基本测试时遇到问题。
使用 @JmsListener
bean 时,会消耗消息。
当不使用 @JmsListener
并在测试 class 中使用通过 @Autowired JmsTemplate
从 connectionFactory
获得的消费者时,消息不会被消费。
我添加了一些日志记录和调试输出,但不明白为什么我不能在测试中使用消息 class 但 @JmsListener
bean 可以。
@SpringBootTest
@ActiveProfiles("tc")
@Log4j2
public class SessionActiveMQIT {
@Autowired
public JmsTemplate jmsTemplate;
@Test
void canEnqueueAndPersistClientAck() throws JMSException, InterruptedException {
final ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST_QUEUE");
jmsTemplate.setDeliveryPersistent(true);
jmsTemplate.setSessionAcknowledgeMode(JmsProperties.AcknowledgeMode.CLIENT.getMode());
jmsTemplate.setSessionTransacted(true);
jmsTemplate.setDefaultDestination(activeMQQueue);
jmsTemplate.setPubSubDomain(false);
jmsTemplate.setPubSubNoLocal(false);
final ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
activeMQTextMessage.setText("MESSAGE");
activeMQTextMessage.setPersistent(true);
jmsTemplate.execute("TEST_QUEUE", ((session, messageProducer) -> {
try {
log.info("Sending to Queue.");
messageProducer.send(activeMQTextMessage, DeliveryMode.PERSISTENT, 4, 30000);
session.commit();
session.close();
log.info("Committed and Closed.");
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
session.rollback();
session.close();
}
return session;
}));
log.info("Create session from conn factory.");
final Session session = jmsTemplate.getConnectionFactory().createConnection().createSession();
log.info("Consumer creation.");
final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(activeMQQueue);
log.info("Consume Message");
log.info(consumer.receive(100L));
}
}
日志输出:
02 Mar 2021 16:48:34,298 [ INFO] --- o.a.a.b.BrokerService : Using Persistence Adapter: MemoryPersistenceAdapter
02 Mar 2021 16:48:34,438 [ INFO] --- o.a.a.b.BrokerService : Apache ActiveMQ 5.16.1 (localhost, ID:devbox-44103-1614703714311-0:1) is starting
02 Mar 2021 16:48:34,442 [DEBUG] --- o.a.a.b.j.Log4JConfigView : Could not locate log4j classes on classpath.
02 Mar 2021 16:48:34,442 [ INFO] --- o.a.a.b.BrokerService : Apache ActiveMQ 5.16.1 (localhost, ID:devbox-44103-1614703714311-0:1) started
02 Mar 2021 16:48:34,442 [ INFO] --- o.a.a.b.BrokerService : For help or more information please see: http://activemq.apache.org
02 Mar 2021 16:48:34,445 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.MasterBroker
02 Mar 2021 16:48:34,452 [DEBUG] --- o.a.a.t.TaskRunnerFactory : Initialized TaskRunnerFactory[ActiveMQ BrokerService[localhost] Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@55bf08a5[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
02 Mar 2021 16:48:34,456 [DEBUG] --- o.a.a.t.v.VMTransportFactory : binding to broker: localhost
02 Mar 2021 16:48:34,459 [ INFO] --- o.a.a.b.TransportConnector : Connector vm://localhost started
02 Mar 2021 16:48:34,463 [DEBUG] --- o.a.a.t.TaskRunnerFactory : Initialized TaskRunnerFactory[ActiveMQ VMTransport: vm://localhost#0] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@297db6ad[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
02 Mar 2021 16:48:34,472 [DEBUG] --- o.a.a.t.TaskRunnerFactory : Initialized TaskRunnerFactory[ActiveMQ VMTransport: vm://localhost#1] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@170437d4[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
02 Mar 2021 16:48:34,474 [DEBUG] --- o.a.a.b.TransportConnection : Setting up new connection id: ID:devbox-44103-1614703714311-4:1, address: vm://localhost#0, info: ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:devbox-44103-1614703714311-4:1, clientId = ID:devbox-44103-1614703714311-3:1, clientIp = null, userName = admin, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true, faultTolerant = false, failoverReconnect = false}
02 Mar 2021 16:48:34,475 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,475 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,475 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.Connection
02 Mar 2021 16:48:34,480 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding consumer: ID:devbox-44103-1614703714311-4:1:-1:1 for destination: ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic
02 Mar 2021 16:48:34,514 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: queue://TEST_QUEUE
02 Mar 2021 16:48:34,529 [DEBUG] --- o.a.a.b.r.Queue : queue://TEST_QUEUE, subscriptions=0, memory=0%, size=0, pending=0 toPageIn: 0, force:false, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 0, dequeueCount: 0, memUsage:0, maxPageSize:200
02 Mar 2021 16:48:34,530 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,530 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,530 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.Queue
02 Mar 2021 16:48:34,532 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,532 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,532 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.Producer.Queue.TEST_QUEUE
02 Mar 2021 16:48:34,534 [ INFO] --- m.a.w.c.SessionActiveMQIT : Sending to Queue.
02 Mar 2021 16:48:34,535 [DEBUG] --- o.a.a.TransactionContext : Begin:TX:ID:devbox-44103-1614703714311-4:1:1
02 Mar 2021 16:48:34,536 [DEBUG] --- o.a.a.ActiveMQSession : ID:devbox-44103-1614703714311-4:1:1 Transaction Commit :TX:ID:devbox-44103-1614703714311-4:1:1
02 Mar 2021 16:48:34,536 [DEBUG] --- o.a.a.TransactionContext : Commit: TX:ID:devbox-44103-1614703714311-4:1:1 syncCount: 0
02 Mar 2021 16:48:34,539 [DEBUG] --- o.a.a.t.LocalTransaction : commit: TX:ID:devbox-44103-1614703714311-4:1:1 syncCount: 1
02 Mar 2021 16:48:34,540 [DEBUG] --- o.a.a.b.r.Queue : localhost Message ID:devbox-44103-1614703714311-4:1:1:1:1 sent to queue://TEST_QUEUE
02 Mar 2021 16:48:34,541 [ INFO] --- m.a.w.c.SessionActiveMQIT : Committed and Closed.
02 Mar 2021 16:48:34,541 [DEBUG] --- o.a.a.b.r.Queue : queue://TEST_QUEUE, subscriptions=0, memory=0%, size=1, pending=0 toPageIn: 1, force:false, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:1038, maxPageSize:200
02 Mar 2021 16:48:34,545 [ INFO] --- m.a.w.c.SessionActiveMQIT : Create session from conn factory.
02 Mar 2021 16:48:34,545 [DEBUG] --- o.a.a.b.j.ManagementContext : Unregistering MBean org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST_QUEUE,endpoint=Producer,clientId=ID_devbox-44103-1614703714311-3_1,producerId=ID_devbox-44103-1614703714311-4_1_1_1
02 Mar 2021 16:48:34,546 [ INFO] --- m.a.w.c.SessionActiveMQIT : Consumer creation.
02 Mar 2021 16:48:34,546 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,546 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,552 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding consumer: ID:devbox-44103-1614703714311-4:1:2:1 for destination: queue://TEST_QUEUE
02 Mar 2021 16:48:34,558 [DEBUG] --- o.a.a.b.r.Queue : queue://TEST_QUEUE add sub: QueueSubscription: consumer=ID:devbox-44103-1614703714311-4:1:2:1, destinations=0, dispatched=0, delivered=0, pending=0, prefetch=1000, prefetchExtension=0, dequeues: 0, dispatched: 0, inflight: 0
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.r.Queue : queue://TEST_QUEUE, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 1, force:false, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:1038, maxPageSize:200
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.Consumer.Queue.TEST_QUEUE
02 Mar 2021 16:48:34,562 [ INFO] --- m.a.w.c.SessionActiveMQIT : Consume Message
02 Mar 2021 16:48:34,662 [ INFO] --- m.a.w.c.SessionActiveMQIT : null
我认为您需要在 javax.jms.Connection
的实例上调用 start()
才能让消息流向消费者,例如:
final Connection connection = jmsTemplate.getConnectionFactory().createConnection();
final Session session = connection.createSession();
connection.start()
此外,请务必在使用完资源(即连接、会话、使用者)后关闭它们。目前它们刚刚超出范围,这意味着它们正在被泄露。我知道这只是一个测试,但这仍然是一个很好的练习。
我在编写将消息发布到点对点队列的基本测试时遇到问题。
使用 @JmsListener
bean 时,会消耗消息。
当不使用 @JmsListener
并在测试 class 中使用通过 @Autowired JmsTemplate
从 connectionFactory
获得的消费者时,消息不会被消费。
我添加了一些日志记录和调试输出,但不明白为什么我不能在测试中使用消息 class 但 @JmsListener
bean 可以。
@SpringBootTest
@ActiveProfiles("tc")
@Log4j2
public class SessionActiveMQIT {
@Autowired
public JmsTemplate jmsTemplate;
@Test
void canEnqueueAndPersistClientAck() throws JMSException, InterruptedException {
final ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST_QUEUE");
jmsTemplate.setDeliveryPersistent(true);
jmsTemplate.setSessionAcknowledgeMode(JmsProperties.AcknowledgeMode.CLIENT.getMode());
jmsTemplate.setSessionTransacted(true);
jmsTemplate.setDefaultDestination(activeMQQueue);
jmsTemplate.setPubSubDomain(false);
jmsTemplate.setPubSubNoLocal(false);
final ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
activeMQTextMessage.setText("MESSAGE");
activeMQTextMessage.setPersistent(true);
jmsTemplate.execute("TEST_QUEUE", ((session, messageProducer) -> {
try {
log.info("Sending to Queue.");
messageProducer.send(activeMQTextMessage, DeliveryMode.PERSISTENT, 4, 30000);
session.commit();
session.close();
log.info("Committed and Closed.");
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
session.rollback();
session.close();
}
return session;
}));
log.info("Create session from conn factory.");
final Session session = jmsTemplate.getConnectionFactory().createConnection().createSession();
log.info("Consumer creation.");
final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(activeMQQueue);
log.info("Consume Message");
log.info(consumer.receive(100L));
}
}
日志输出:
02 Mar 2021 16:48:34,298 [ INFO] --- o.a.a.b.BrokerService : Using Persistence Adapter: MemoryPersistenceAdapter
02 Mar 2021 16:48:34,438 [ INFO] --- o.a.a.b.BrokerService : Apache ActiveMQ 5.16.1 (localhost, ID:devbox-44103-1614703714311-0:1) is starting
02 Mar 2021 16:48:34,442 [DEBUG] --- o.a.a.b.j.Log4JConfigView : Could not locate log4j classes on classpath.
02 Mar 2021 16:48:34,442 [ INFO] --- o.a.a.b.BrokerService : Apache ActiveMQ 5.16.1 (localhost, ID:devbox-44103-1614703714311-0:1) started
02 Mar 2021 16:48:34,442 [ INFO] --- o.a.a.b.BrokerService : For help or more information please see: http://activemq.apache.org
02 Mar 2021 16:48:34,445 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.MasterBroker
02 Mar 2021 16:48:34,452 [DEBUG] --- o.a.a.t.TaskRunnerFactory : Initialized TaskRunnerFactory[ActiveMQ BrokerService[localhost] Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@55bf08a5[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
02 Mar 2021 16:48:34,456 [DEBUG] --- o.a.a.t.v.VMTransportFactory : binding to broker: localhost
02 Mar 2021 16:48:34,459 [ INFO] --- o.a.a.b.TransportConnector : Connector vm://localhost started
02 Mar 2021 16:48:34,463 [DEBUG] --- o.a.a.t.TaskRunnerFactory : Initialized TaskRunnerFactory[ActiveMQ VMTransport: vm://localhost#0] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@297db6ad[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
02 Mar 2021 16:48:34,472 [DEBUG] --- o.a.a.t.TaskRunnerFactory : Initialized TaskRunnerFactory[ActiveMQ VMTransport: vm://localhost#1] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@170437d4[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
02 Mar 2021 16:48:34,474 [DEBUG] --- o.a.a.b.TransportConnection : Setting up new connection id: ID:devbox-44103-1614703714311-4:1, address: vm://localhost#0, info: ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:devbox-44103-1614703714311-4:1, clientId = ID:devbox-44103-1614703714311-3:1, clientIp = null, userName = admin, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true, faultTolerant = false, failoverReconnect = false}
02 Mar 2021 16:48:34,475 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,475 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,475 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.Connection
02 Mar 2021 16:48:34,480 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding consumer: ID:devbox-44103-1614703714311-4:1:-1:1 for destination: ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic
02 Mar 2021 16:48:34,514 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: queue://TEST_QUEUE
02 Mar 2021 16:48:34,529 [DEBUG] --- o.a.a.b.r.Queue : queue://TEST_QUEUE, subscriptions=0, memory=0%, size=0, pending=0 toPageIn: 0, force:false, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 0, dequeueCount: 0, memUsage:0, maxPageSize:200
02 Mar 2021 16:48:34,530 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,530 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,530 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.Queue
02 Mar 2021 16:48:34,532 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,532 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,532 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.Producer.Queue.TEST_QUEUE
02 Mar 2021 16:48:34,534 [ INFO] --- m.a.w.c.SessionActiveMQIT : Sending to Queue.
02 Mar 2021 16:48:34,535 [DEBUG] --- o.a.a.TransactionContext : Begin:TX:ID:devbox-44103-1614703714311-4:1:1
02 Mar 2021 16:48:34,536 [DEBUG] --- o.a.a.ActiveMQSession : ID:devbox-44103-1614703714311-4:1:1 Transaction Commit :TX:ID:devbox-44103-1614703714311-4:1:1
02 Mar 2021 16:48:34,536 [DEBUG] --- o.a.a.TransactionContext : Commit: TX:ID:devbox-44103-1614703714311-4:1:1 syncCount: 0
02 Mar 2021 16:48:34,539 [DEBUG] --- o.a.a.t.LocalTransaction : commit: TX:ID:devbox-44103-1614703714311-4:1:1 syncCount: 1
02 Mar 2021 16:48:34,540 [DEBUG] --- o.a.a.b.r.Queue : localhost Message ID:devbox-44103-1614703714311-4:1:1:1:1 sent to queue://TEST_QUEUE
02 Mar 2021 16:48:34,541 [ INFO] --- m.a.w.c.SessionActiveMQIT : Committed and Closed.
02 Mar 2021 16:48:34,541 [DEBUG] --- o.a.a.b.r.Queue : queue://TEST_QUEUE, subscriptions=0, memory=0%, size=1, pending=0 toPageIn: 1, force:false, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:1038, maxPageSize:200
02 Mar 2021 16:48:34,545 [ INFO] --- m.a.w.c.SessionActiveMQIT : Create session from conn factory.
02 Mar 2021 16:48:34,545 [DEBUG] --- o.a.a.b.j.ManagementContext : Unregistering MBean org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST_QUEUE,endpoint=Producer,clientId=ID_devbox-44103-1614703714311-3_1,producerId=ID_devbox-44103-1614703714311-4_1_1_1
02 Mar 2021 16:48:34,546 [ INFO] --- m.a.w.c.SessionActiveMQIT : Consumer creation.
02 Mar 2021 16:48:34,546 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,546 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,552 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding consumer: ID:devbox-44103-1614703714311-4:1:2:1 for destination: queue://TEST_QUEUE
02 Mar 2021 16:48:34,558 [DEBUG] --- o.a.a.b.r.Queue : queue://TEST_QUEUE add sub: QueueSubscription: consumer=ID:devbox-44103-1614703714311-4:1:2:1, destinations=0, dispatched=0, delivered=0, pending=0, prefetch=1000, prefetchExtension=0, dequeues: 0, dispatched: 0, inflight: 0
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.r.Queue : queue://TEST_QUEUE, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 1, force:false, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:1038, maxPageSize:200
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.Consumer.Queue.TEST_QUEUE
02 Mar 2021 16:48:34,562 [ INFO] --- m.a.w.c.SessionActiveMQIT : Consume Message
02 Mar 2021 16:48:34,662 [ INFO] --- m.a.w.c.SessionActiveMQIT : null
我认为您需要在 javax.jms.Connection
的实例上调用 start()
才能让消息流向消费者,例如:
final Connection connection = jmsTemplate.getConnectionFactory().createConnection();
final Session session = connection.createSession();
connection.start()
此外,请务必在使用完资源(即连接、会话、使用者)后关闭它们。目前它们刚刚超出范围,这意味着它们正在被泄露。我知道这只是一个测试,但这仍然是一个很好的练习。