Apache ActiveMQ Artemis 如何调查消息是否丢失?

Apache ActiveMQ Artemis how to investigate if messages were lost?

我使用 ActiveMQ Artemis 2.16.0 作为我的代理,artemis-jms-client-2.16.0.jar 作为我的 JMS 客户端。感觉由于我不知道的原因,我随机丢失了一些消息。我调查了我的 Java 代码,没有发现任何异常。

我有办法

@JmsListener(destination = "${myQueue}", containerFactory = "jmsListenerContainerFactory")
@Override
public void process(Message message) {
    try {
        processMessage(Message message);
    } catch (Exception ex) {
        LOG.error("Error[...]", ex);
        responseSender.send(otherQueue, message, ex);
    }
}

processMessage(Message message) 方法如下所示:

public void processMessage(Message message) {
    try {
        byte[] request = message.getBody(byte[].class);
        [...]
        if (!condition) {
            throw new MyBusinessError("error happened");
        }
        [...]
    } finally {
        MDC.remove(ID);
    } 
}
@Bean(name = "jmsListenerContainerFactoryTest")
@Primary
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    factory.setSessionTransacted(true);
    factory.setConnectionFactory(cachingConnectionFactory());
    return factory;
}
public class MyBusinessException extends Exception {
    private int code;
    [...]
}

broker.xml:

<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>

      <persistence-enabled>true</persistence-enabled>
      
      <journal-type>NIO</journal-type>

      <paging-directory>data/paging</paging-directory>

      <bindings-directory>data/bindings</bindings-directory>

      <journal-directory>data/journal</journal-directory>

      <large-messages-directory>data/large-messages</large-messages-directory>

      <journal-datasync>true</journal-datasync>

      <journal-min-files>2</journal-min-files>

      <journal-pool-files>10</journal-pool-files>

      <journal-device-block-size>4096</journal-device-block-size>

      <journal-file-size>10M</journal-file-size>
      
      <!--
       This value was determined through a calculation.
       Your system could perform 2,17 writes per millisecond
       on the current journal configuration.
       That translates as a sync write every 490000 nanoseconds.

       Note: If you specify 0 the system will perform writes directly to the disk.
             We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
      -->
      <journal-buffer-timeout>490000</journal-buffer-timeout>


      <!--
        When using ASYNCIO, this will determine the writing queue depth for libaio.
       -->
      <journal-max-io>1</journal-max-io>

      <!-- how often we are looking for how many bytes are being used on the disk in ms -->
      <disk-scan-period>5000</disk-scan-period>

      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
           that won't support flow control. -->
      <max-disk-usage>90</max-disk-usage>

      <!-- should the broker detect dead locks and other issues -->
      <critical-analyzer>true</critical-analyzer>

      <critical-analyzer-timeout>120000</critical-analyzer-timeout>

      <critical-analyzer-check-period>60000</critical-analyzer-check-period>

      <critical-analyzer-policy>HALT</critical-analyzer-policy>

      
      <page-sync-timeout>460000</page-sync-timeout>

      <acceptors>
         <!-- Acceptor for every supported protocol -->
         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>

         <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
         <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>

         <!-- STOMP Acceptor. -->
         <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

         <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
         <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

         <!-- MQTT Acceptor -->
         <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>

      </acceptors>


      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq, admin"/>
         </security-setting>
      </security-settings>
      <connection-ttl-override>60000</connection-ttl-override>
      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
                     <!-- <config-delete-queues>FORCE</config-delete-queues>
                      <config-delete-addresses>FORCE</config-delete-addresses>-->
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
            <auto-delete-queues>false</auto-delete-queues>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
            <auto-delete-queues>false</auto-delete-queues>
         </address-setting>
      </address-settings>

      <addresses>
        <address name="MyQueue">
            <anycast>
               <queue name="MyQueue">
               </queue>
            </anycast>
         </address>
         <address name="MyOtherQueue">
            <anycast>
               <queue name="MyOtherQueue" />
            </anycast>
         </address>                                                 
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>

      </addresses>

   </core>
</configuration>

如果抛出 MyBusinessError(...),我们的想法是捕获异常并将完全相同的消息发送到 myOtherQueue。如果发送该消息失败(即发生异常),则它会再次重新发送,依此类推最多 10 次,然后发送到 DLQ。从本质上讲,这是我大部分时间看到的,但在我的日志中的随机时刻,我只看到一次尝试重新发送消息,DLQ 中没有消息,接收方抱怨没有消息。感觉消息丢失了。我用放大镜查看了 myOtherQueue,可以说同时使用了 Artemis 控制台和 JmsToolbox,但我只看到一个空队列。我这个队列上没有消费者。

目的不是将失败消息发送到 DLQ,而是发送到另一个队列 (myOtherQueue) 以供以后调查。如果碰巧消息无法传递到该队列,它将被放置在 DLQ 上。我就是这么想的。

在一天结束时随机丢失的消息很少,这就是我想要了解的。我应该如何调查 Artemis 并查看是否发生了任何消息丢失?从哪儿开始?使用什么工具?

我会首先在每条消息中放置一个 属性 以使其能够被唯一标识,然后记录该值,以便您稍后可以关联客户端和代理日志。如果您使用的是 JMS,那么您可以使用这样的东西:

String uuid = java.util.UUID.randomUUID().toString();
message.setStringProperty("UUID", uuid);
logger.info("Sending message with UUID: " + uuid);

当然你也想在消费者上记录这个,例如:

Message message = consumer.receive();
String uuid = message.getStringProperty("UUID");
logger.info("Received message with UUID: " + uuid);

你应该激活经纪人 audit logging or perhaps use the LoggingActiveMQServerPlugin

完成所有日志记录后,您只需等到您认为自己丢失了一条消息,然后通过日志查找已发送但未收到的消息的 ID。一旦你知道了,你就可以浏览代理日志,看看它是否被代理正确接收,是否发送给消费者等等。这将帮助你缩小问题所在的范围。