Apache ActiveMQ Artemis 如何调查消息是否丢失?

Apache ActiveMQ Artemis how to investigate if messages were lost?

我使用 ActiveMQ Artemis 2.16.0 作为我的代理,artemis-jms-client-2.16.0.jar 作为我的 JMS 客户端。感觉由于我不知道的原因,我随机丢失了一些消息。我调查了我的 Java 代码,没有发现任何异常。


@JmsListener(destination = "${myQueue}", containerFactory = "jmsListenerContainerFactory")
public void process(Message message) {
    try {
        processMessage(Message message);
    } catch (Exception ex) {
        LOG.error("Error[...]", ex);
        responseSender.send(otherQueue, message, ex);

processMessage(Message message) 方法如下所示:

public void processMessage(Message message) {
    try {
        byte[] request = message.getBody(byte[].class);
        if (!condition) {
            throw new MyBusinessError("error happened");
    } finally {
@Bean(name = "jmsListenerContainerFactoryTest")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
    return factory;
public class MyBusinessException extends Exception {
    private int code;


<configuration xmlns="urn:activemq"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">











       This value was determined through a calculation.
       Your system could perform 2,17 writes per millisecond
       on the current journal configuration.
       That translates as a sync write every 490000 nanoseconds.

       Note: If you specify 0 the system will perform writes directly to the disk.
             We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.

        When using ASYNCIO, this will determine the writing queue depth for libaio.

      <!-- how often we are looking for how many bytes are being used on the disk in ms -->

      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
           that won't support flow control. -->

      <!-- should the broker detect dead locks and other issues -->





         <!-- Acceptor for every supported protocol -->
         <acceptor name="artemis">tcp://;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>

         <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
         <acceptor name="amqp">tcp://;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>

         <!-- STOMP Acceptor. -->
         <acceptor name="stomp">tcp://;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

         <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
         <acceptor name="hornetq">tcp://;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

         <!-- MQTT Acceptor -->
         <acceptor name="mqtt">tcp://;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>


         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq, admin"/>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
                     <!-- <config-delete-queues>FORCE</config-delete-queues>
            <!-- with -1 only the global-max-size is in use for limiting -->
         <!--default for catch all-->
         <address-setting match="#">

        <address name="MyQueue">
               <queue name="MyQueue">
         <address name="MyOtherQueue">
               <queue name="MyOtherQueue" />
         <address name="DLQ">
               <queue name="DLQ" />
         <address name="ExpiryQueue">
               <queue name="ExpiryQueue" />



如果抛出 MyBusinessError(...),我们的想法是捕获异常并将完全相同的消息发送到 myOtherQueue。如果发送该消息失败(即发生异常),则它会再次重新发送,依此类推最多 10 次,然后发送到 DLQ。从本质上讲,这是我大部分时间看到的,但在我的日志中的随机时刻,我只看到一次尝试重新发送消息,DLQ 中没有消息,接收方抱怨没有消息。感觉消息丢失了。我用放大镜查看了 myOtherQueue,可以说同时使用了 Artemis 控制台和 JmsToolbox,但我只看到一个空队列。我这个队列上没有消费者。

目的不是将失败消息发送到 DLQ,而是发送到另一个队列 (myOtherQueue) 以供以后调查。如果碰巧消息无法传递到该队列,它将被放置在 DLQ 上。我就是这么想的。

在一天结束时随机丢失的消息很少,这就是我想要了解的。我应该如何调查 Artemis 并查看是否发生了任何消息丢失?从哪儿开始?使用什么工具?

我会首先在每条消息中放置一个 属性 以使其能够被唯一标识,然后记录该值,以便您稍后可以关联客户端和代理日志。如果您使用的是 JMS,那么您可以使用这样的东西:

String uuid = java.util.UUID.randomUUID().toString();
message.setStringProperty("UUID", uuid);
logger.info("Sending message with UUID: " + uuid);


Message message = consumer.receive();
String uuid = message.getStringProperty("UUID");
logger.info("Received message with UUID: " + uuid);

你应该激活经纪人 audit logging or perhaps use the LoggingActiveMQServerPlugin

完成所有日志记录后,您只需等到您认为自己丢失了一条消息,然后通过日志查找已发送但未收到的消息的 ID。一旦你知道了,你就可以浏览代理日志,看看它是否被代理正确接收,是否发送给消费者等等。这将帮助你缩小问题所在的范围。