ActiveMQ Artemis 集群在一个实例崩溃后不重新分发消息

ActiveMQ Artemis cluster does not redistribute messages after one instance crash

我在 Kubernetes 中有一个 Artemis 集群,有 3 组 master/slave:

activemq-artemis-master-0                               1/1     Running
activemq-artemis-master-1                               1/1     Running
activemq-artemis-master-2                               1/1     Running
activemq-artemis-slave-0                                0/1     Running
activemq-artemis-slave-1                                0/1     Running
activemq-artemis-slave-2                                0/1     Running

我正在使用 Spring 引导 JmsListener 来使用发送到通配符队列的消息,如下所示。

    @Component
    @Log4j2
    public class QueueListener {
      @Autowired
      private ListenerControl listenerControl;
    
      @JmsListener(id = "queueListener0", destination = "QUEUE.service2.*.*.*.notification")
      public void add(String message, @Header("sentBy") String sentBy, @Header("sentFrom") String sentFrom, @Header("sentAt") Long sentAt) throws InterruptedException {
    
    
        log.info("---QUEUE[notification]:  message={}, sentBy={}, sentFrom={}, sentAt={}",
                    message, sentBy, sentFrom, sentAt);
    
        TimeUnit.MILLISECONDS.sleep(listenerControl.getDuration());
      }
    }

有 20 条消息发送到队列,master-1 是传递节点。当 5 条消息被消耗后,我杀死了 master-1 节点以模拟崩溃,我看到 slave-1 启动 运行 然后在 Kubernetes 重新生成它后返回到 master-1。侦听器抛出 JMSException 连接丢失并尝试重新连接。然后我看到它成功连接到 master-0(我看到创建的队列和消费者计数 > 0)。然而 master-0 上的队列是空的,而 master-1 中的同一个队列仍然有 15 条消息并且没有消费者附加到它。我等了一会儿,但 15 条消息从未送达。我不确定为什么重新分配没有启动。

master-1上的wildcard queue的属性是crash后重新上线时是这样的(我手动替换了accessToken字段的值,因为敏感信息):

Attribute   Value
Acknowledge attempts    0
Address QUEUE.service2.*.*.*.notification
Configuration managed   false
Consumer count  0
Consumers before dispatch   0
Dead letter address DLQ
Delay before dispatch   -1
Delivering count    0
Delivering size 0
Durable true
Durable delivering count    0
Durable delivering size 0
Durable message count   15
Durable persistent size 47705
Durable scheduled count 0
Durable scheduled size  0
Enabled true
Exclusive   false
Expiry address  ExpiryQueue
Filter  
First message age   523996
First message as json   [{"JMSType":"service2","address":"QUEUE.service2.tech-drive2.188100000059.thai.notification","messageID":68026,"sentAt":1621957145988,"accessToken":"REMOVED","type":3,"priority":4,"userID":"ID:56c7b509-bd6f-11eb-a348-de0dacf99072","_AMQ_GROUP_ID":"tech-drive2-188100000059-thai","sentBy":"user@email.com","durable":true,"JMSReplyTo":"queue://QUEUE.service2.tech-drive2.188100000059.thai.notification","__AMQ_CID":"e4469ea3-bd62-11eb-a348-de0dacf99072","sentFrom":"service2","originalDestination":"QUEUE.service2.tech-drive2.188100000059.thai.notification","_AMQ_ROUTING_TYPE":1,"JMSCorrelationID":"c329c733-1170-440a-9080-992a009d87a9","expiration":0,"timestamp":1621957145988}]
First message timestamp 1621957145988
Group buckets   -1
Group count 0
Group first key 
Group rebalance false
Group rebalance pause dispatch  false
Id  119
Last value  false
Last value key  
Max consumers   -1
Message count   15
Messages acknowledged   0
Messages added  15
Messages expired    0
Messages killed 0
Name    QUEUE.service2.*.*.*.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-1",component=addresses,address="QUEUE.service2.\*.\*.\*.notification",subcomponent=queues,routing-type="anycast",queue="QUEUE.service2.\*.\*.\*.notification"
Paused  false
Persistent size 47705
Prepared transaction message count  0
Purge on no consumers   false
Retroactive resource    false
Ring size   -1
Routing type    ANYCAST
Scheduled count 0
Scheduled size  0
Temporary   false
User    f7bcdaed-8c0c-4bb5-ad03-ec06382cb557

master-0上通配符队列的属性是这样的:

Attribute   Value
Acknowledge attempts    0
Address QUEUE.service2.*.*.*.notification
Configuration managed   false
Consumer count  3
Consumers before dispatch   0
Dead letter address DLQ
Delay before dispatch   -1
Delivering count    0
Delivering size 0
Durable true
Durable delivering count    0
Durable delivering size 0
Durable message count   0
Durable persistent size 0
Durable scheduled count 0
Durable scheduled size  0
Enabled true
Exclusive   false
Expiry address  ExpiryQueue
Filter  
First message age   
First message as json   [{}]
First message timestamp 
Group buckets   -1
Group count 0
Group first key 
Group rebalance false
Group rebalance pause dispatch  false
Id  119
Last value  false
Last value key  
Max consumers   -1
Message count   0
Messages acknowledged   0
Messages added  0
Messages expired    0
Messages killed 0
Name    QUEUE.service2.*.*.*.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-0",component=addresses,address="QUEUE.service2.\*.\*.\*.notification",subcomponent=queues,routing-type="anycast",queue="QUEUE.service2.\*.\*.\*.notification"
Paused  false
Persistent size 0
Prepared transaction message count  0
Purge on no consumers   false
Retroactive resource    false
Ring size   -1
Routing type    ANYCAST
Scheduled count 0
Scheduled size  0
Temporary   false
User    f7bcdaed-8c0c-4bb5-ad03-ec06382cb557

使用的Artemis版本是2.17.0。这是我在 master-0 broker.xml 中的集群配置。除了 connector-ref 已更改以匹配经纪人外,其他经纪人的配置相同:

<?xml version="1.0"?>
<configuration xmlns="urn:activemq" xmlns:xi="http://www.w3.org/2001/XInclude" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
  <core xmlns="urn:activemq:core" xsi:schemaLocation="urn:activemq:core ">
    <name>activemq-artemis-master-0</name>
    <persistence-enabled>true</persistence-enabled>
    <journal-type>ASYNCIO</journal-type>
    <paging-directory>data/paging</paging-directory>
    <bindings-directory>data/bindings</bindings-directory>
    <journal-directory>data/journal</journal-directory>
    <large-messages-directory>data/large-messages</large-messages-directory>
    <journal-datasync>true</journal-datasync>
    <journal-min-files>2</journal-min-files>
    <journal-pool-files>10</journal-pool-files>
    <journal-device-block-size>4096</journal-device-block-size>
    <journal-file-size>10M</journal-file-size>
    <journal-buffer-timeout>100000</journal-buffer-timeout>
    <journal-max-io>4096</journal-max-io>
    <disk-scan-period>5000</disk-scan-period>
    <max-disk-usage>90</max-disk-usage>
    <critical-analyzer>true</critical-analyzer>
    <critical-analyzer-timeout>120000</critical-analyzer-timeout>
    <critical-analyzer-check-period>60000</critical-analyzer-check-period>
    <critical-analyzer-policy>HALT</critical-analyzer-policy>
    <page-sync-timeout>2244000</page-sync-timeout>
    <acceptors>
      <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
      <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
      <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
      <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
      <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
    </acceptors>
    <security-settings>
      <security-setting match="#">
        <permission type="createNonDurableQueue" roles="amq"/>
        <permission type="deleteNonDurableQueue" roles="amq"/>
        <permission type="createDurableQueue" roles="amq"/>
        <permission type="deleteDurableQueue" roles="amq"/>
        <permission type="createAddress" roles="amq"/>
        <permission type="deleteAddress" roles="amq"/>
        <permission type="consume" roles="amq"/>
        <permission type="browse" roles="amq"/>
        <permission type="send" roles="amq"/>
        <permission type="manage" roles="amq"/>
      </security-setting>
    </security-settings>
    <address-settings>
      <address-setting match="activemq.management#">
        <dead-letter-address>DLQ</dead-letter-address>
        <expiry-address>ExpiryQueue</expiry-address>
        <redelivery-delay>0</redelivery-delay>
        <!-- with -1 only the global-max-size is in use for limiting -->
        <max-size-bytes>-1</max-size-bytes>
        <message-counter-history-day-limit>10</message-counter-history-day-limit>
        <address-full-policy>PAGE</address-full-policy>
        <auto-create-queues>true</auto-create-queues>
        <auto-create-addresses>true</auto-create-addresses>
        <auto-create-jms-queues>true</auto-create-jms-queues>
        <auto-create-jms-topics>true</auto-create-jms-topics>
      </address-setting>
      <address-setting match="#">
        <dead-letter-address>DLQ</dead-letter-address>
        <expiry-address>ExpiryQueue</expiry-address>
        <redistribution-delay>60000</redistribution-delay>
        <redelivery-delay>0</redelivery-delay>
        <max-size-bytes>-1</max-size-bytes>
        <message-counter-history-day-limit>10</message-counter-history-day-limit>
        <address-full-policy>PAGE</address-full-policy>
        <auto-create-queues>true</auto-create-queues>
        <auto-create-addresses>true</auto-create-addresses>
        <auto-create-jms-queues>true</auto-create-jms-queues>
        <auto-create-jms-topics>true</auto-create-jms-topics>
      </address-setting>
    </address-settings>
    <addresses>
      <address name="DLQ">
        <anycast>
          <queue name="DLQ"/>
        </anycast>
      </address>
      <address name="ExpiryQueue">
        <anycast>
          <queue name="ExpiryQueue"/>
        </anycast>
      </address>
    </addresses>
    <cluster-user>clusterUser</cluster-user>
    <cluster-password>aShortclusterPassword</cluster-password>
    <connectors>
      <connector name="activemq-artemis-master-0">tcp://activemq-artemis-master-0.activemq-artemis-master.svc.cluster.local:61616</connector>
      <connector name="activemq-artemis-slave-0">tcp://activemq-artemis-slave-0.activemq-artemis-slave.svc.cluster.local:61616</connector>
      <connector name="activemq-artemis-master-1">tcp://activemq-artemis-master-1.activemq-artemis-master.svc.cluster.local:61616</connector>
      <connector name="activemq-artemis-slave-1">tcp://activemq-artemis-slave-1.activemq-artemis-slave.svc.cluster.local:61616</connector>
      <connector name="activemq-artemis-master-2">tcp://activemq-artemis-master-2.activemq-artemis-master.svc.cluster.local:61616</connector>
      <connector name="activemq-artemis-slave-2">tcp://activemq-artemis-slave-2.activemq-artemis-slave.svc.cluster.local:61616</connector>
    </connectors>
    <cluster-connections>
      <cluster-connection name="activemq-artemis">
        <connector-ref>activemq-artemis-master-0</connector-ref>
        <retry-interval>500</retry-interval>
        <retry-interval-multiplier>1.1</retry-interval-multiplier>
        <max-retry-interval>5000</max-retry-interval>
        <initial-connect-attempts>-1</initial-connect-attempts>
        <reconnect-attempts>-1</reconnect-attempts>
        <message-load-balancing>ON_DEMAND</message-load-balancing>
        <max-hops>1</max-hops>
        <!-- scale-down>true</scale-down -->
        <static-connectors>
          <connector-ref>activemq-artemis-master-0</connector-ref>
          <connector-ref>activemq-artemis-slave-0</connector-ref>
          <connector-ref>activemq-artemis-master-1</connector-ref>
          <connector-ref>activemq-artemis-slave-1</connector-ref>
          <connector-ref>activemq-artemis-master-2</connector-ref>
          <connector-ref>activemq-artemis-slave-2</connector-ref>
        </static-connectors>
      </cluster-connection>
    </cluster-connections>
    <ha-policy>
      <replication>
        <master>
          <group-name>activemq-artemis-0</group-name>
          <quorum-vote-wait>12</quorum-vote-wait>
          <vote-on-replication-failure>true</vote-on-replication-failure>
          <!--we need this for auto failback-->
          <check-for-live-server>true</check-for-live-server>
        </master>
      </replication>
    </ha-policy>
  </core>
  <core xmlns="urn:activemq:core">
    <jmx-management-enabled>true</jmx-management-enabled>
  </core>
</configuration>

从 Stack Overflow 的另一个回答中,我了解到我的高可用性拓扑是多余的,我打算删除从站。但是,我不认为奴隶是重新分配消息不起作用的原因。是否缺少处理 Artemis 节点崩溃的配置?

更新 1: 正如 Justin 所建议的,我尝试在没有 HA 的情况下使用 Artemis 的 2 个节点的集群。

activemq-artemis-master-0                              1/1     Running            0          27m
activemq-artemis-master-1                              1/1     Running            0          74s

下面是2个artemis节点的broker.xml。它们之间唯一的区别是节点名称和日志缓冲区超时:

<?xml version="1.0"?>

<configuration xmlns="urn:activemq" xmlns:xi="http://www.w3.org/2001/XInclude" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
  <core xmlns="urn:activemq:core" xsi:schemaLocation="urn:activemq:core ">
    <name>activemq-artemis-master-0</name>
    <persistence-enabled>true</persistence-enabled>
    <journal-type>ASYNCIO</journal-type>
    <paging-directory>data/paging</paging-directory>
    <bindings-directory>data/bindings</bindings-directory>
    <journal-directory>data/journal</journal-directory>
    <large-messages-directory>data/large-messages</large-messages-directory>
    <journal-datasync>true</journal-datasync>
    <journal-min-files>2</journal-min-files>
    <journal-pool-files>10</journal-pool-files>
    <journal-device-block-size>4096</journal-device-block-size>
    <journal-file-size>10M</journal-file-size>
    <journal-buffer-timeout>100000</journal-buffer-timeout>
    <journal-max-io>4096</journal-max-io>
    <disk-scan-period>5000</disk-scan-period>
    <max-disk-usage>90</max-disk-usage>
    <critical-analyzer>true</critical-analyzer>
    <critical-analyzer-timeout>120000</critical-analyzer-timeout>
    <critical-analyzer-check-period>60000</critical-analyzer-check-period>
    <critical-analyzer-policy>HALT</critical-analyzer-policy>
    <page-sync-timeout>2244000</page-sync-timeout>
    <acceptors>
      <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
      <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
      <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
      <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
      <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
    </acceptors>
    <security-settings>
      <security-setting match="#">
        <permission type="createNonDurableQueue" roles="amq"/>
        <permission type="deleteNonDurableQueue" roles="amq"/>
        <permission type="createDurableQueue" roles="amq"/>
        <permission type="deleteDurableQueue" roles="amq"/>
        <permission type="createAddress" roles="amq"/>
        <permission type="deleteAddress" roles="amq"/>
        <permission type="consume" roles="amq"/>
        <permission type="browse" roles="amq"/>
        <permission type="send" roles="amq"/>
        <permission type="manage" roles="amq"/>
      </security-setting>
    </security-settings>
    <cluster-user>ClusterUser</cluster-user>
    <cluster-password>longClusterPassword</cluster-password>
    <connectors>
      <connector name="activemq-artemis-master-0">tcp://activemq-artemis-master-0.activemq-artemis-master.ncp-stack-testing.svc.cluster.local:61616</connector>
      <connector name="activemq-artemis-master-1">tcp://activemq-artemis-master-1.activemq-artemis-master.ncp-stack-testing.svc.cluster.local:61616</connector>
    </connectors>
    <cluster-connections>
      <cluster-connection name="activemq-artemis">
        <connector-ref>activemq-artemis-master-0</connector-ref>
        <retry-interval>500</retry-interval>
        <retry-interval-multiplier>1.1</retry-interval-multiplier>
        <max-retry-interval>5000</max-retry-interval>
        <initial-connect-attempts>-1</initial-connect-attempts>
        <reconnect-attempts>-1</reconnect-attempts>
        <use-duplicate-detection>true</use-duplicate-detection>
        <message-load-balancing>ON_DEMAND</message-load-balancing>
        <max-hops>1</max-hops>
        <static-connectors>
          <connector-ref>activemq-artemis-master-0</connector-ref>
          <connector-ref>activemq-artemis-master-1</connector-ref>
        </static-connectors>
      </cluster-connection>
    </cluster-connections>
    <address-settings>
      <address-setting match="activemq.management#">
        <dead-letter-address>DLQ</dead-letter-address>
        <expiry-address>ExpiryQueue</expiry-address>
        <redelivery-delay>0</redelivery-delay>
        <max-size-bytes>-1</max-size-bytes>
        <message-counter-history-day-limit>10</message-counter-history-day-limit>
        <address-full-policy>PAGE</address-full-policy>
        <auto-create-queues>true</auto-create-queues>
        <auto-create-addresses>true</auto-create-addresses>
        <auto-create-jms-queues>true</auto-create-jms-queues>
        <auto-create-jms-topics>true</auto-create-jms-topics>
      </address-setting>
      <address-setting match="#">
        <dead-letter-address>DLQ</dead-letter-address>
        <expiry-address>ExpiryQueue</expiry-address>
        <redistribution-delay>60000</redistribution-delay>
        <redelivery-delay>0</redelivery-delay>
        <max-size-bytes>-1</max-size-bytes>
        <message-counter-history-day-limit>10</message-counter-history-day-limit>
        <address-full-policy>PAGE</address-full-policy>
        <auto-create-queues>true</auto-create-queues>
        <auto-create-addresses>true</auto-create-addresses>
        <auto-create-jms-queues>true</auto-create-jms-queues>
        <auto-create-jms-topics>true</auto-create-jms-topics>
      </address-setting>
    </address-settings>
    <addresses>
      <address name="DLQ">
        <anycast>
          <queue name="DLQ"/>
        </anycast>
      </address>
      <address name="ExpiryQueue">
        <anycast>
          <queue name="ExpiryQueue"/>
        </anycast>
      </address>
    </addresses>
  </core>
  <core xmlns="urn:activemq:core">
    <jmx-management-enabled>true</jmx-management-enabled>
  </core>
</configuration>

使用此设置,我仍然得到相同的结果,在 artemis 节点崩溃和恢复后,遗留的消息没有移动到另一个节点。

更新2 我尝试按照贾斯汀的建议使用非通配符队列,但仍然有相同的行为。我注意到的一个不同是,如果我使用非通配符队列,与通配符情况下的 3 相比,消费者计数仅为 1 queue.Here 是崩溃后旧队列的属性

Acknowledge attempts    0
Address QUEUE.service2.tech-drive2.188100000059.thai.notification
Configuration managed   false
Consumer count  0
Consumers before dispatch   0
Dead letter address DLQ
Delay before dispatch   -1
Delivering count    0
Delivering size 0
Durable true
Durable delivering count    0
Durable delivering size 0
Durable message count   15
Durable persistent size 102245
Durable scheduled count 0
Durable scheduled size  0
Enabled true
Exclusive   false
Expiry address  ExpiryQueue
Filter  
First message age   840031
First message as json   [{"JMSType":"service2","address":"QUEUE.service2.tech-drive2.188100000059.thai.notification","messageID":8739,"sentAt":1621969900922,"accessToken":"DONOTDISPLAY","type":3,"priority":4,"userID":"ID:09502dc0-bd8d-11eb-b75c-c6609f1332c9","_AMQ_GROUP_ID":"tech-drive2-188100000059-thai","sentBy":"user@email.com","durable":true,"JMSReplyTo":"queue://QUEUE.service2.tech-drive2.188100000059.thai.notification","__AMQ_CID":"c292b418-bd8b-11eb-b75c-c6609f1332c9","sentFrom":"service2","originalDestination":"QUEUE.service2.tech-drive2.188100000059.thai.notification","_AMQ_ROUTING_TYPE":1,"JMSCorrelationID":"90b783d0-d9cc-4188-9c9e-3453786b2105","expiration":0,"timestamp":1621969900922}]
First message timestamp 1621969900922
Group buckets   -1
Group count 0
Group first key 
Group rebalance false
Group rebalance pause dispatch  false
Id  606
Last value  false
Last value key  
Max consumers   -1
Message count   15
Messages acknowledged   0
Messages added  15
Messages expired    0
Messages killed 0
Name    QUEUE.service2.tech-drive2.188100000059.thai.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-0",component=addresses,address="QUEUE.service2.tech-drive2.188100000059.thai.notification",subcomponent=queues,routing-type="anycast",queue="QUEUE.service2.tech-drive2.188100000059.thai.notification"
Paused  false
Persistent size 102245
Prepared transaction message count  0
Purge on no consumers   false
Retroactive resource    false
Ring size   -1
Routing type    ANYCAST
Scheduled count 0
Scheduled size  0
Temporary   false
User    6e25e08b-9587-40a3-b7e9-146360539258

这里是新队列的属性

Attribute   Value
Acknowledge attempts    0
Address QUEUE.service2.tech-drive2.188100000059.thai.notification
Configuration managed   false
Consumer count  1
Consumers before dispatch   0
Dead letter address DLQ
Delay before dispatch   -1
Delivering count    0
Delivering size 0
Durable true
Durable delivering count    0
Durable delivering size 0
Durable message count   0
Durable persistent size 0
Durable scheduled count 0
Durable scheduled size  0
Enabled true
Exclusive   false
Expiry address  ExpiryQueue
Filter  
First message age   
First message as json   [{}]
First message timestamp 
Group buckets   -1
Group count 0
Group first key 
Group rebalance false
Group rebalance pause dispatch  false
Id  866
Last value  false
Last value key  
Max consumers   -1
Message count   0
Messages acknowledged   0
Messages added  0
Messages expired    0
Messages killed 0
Name    QUEUE.service2.tech-drive2.188100000059.thai.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-1",component=addresses,address="QUEUE.service2.tech-drive2.188100000059.thai.notification",subcomponent=queues,routing-type="anycast",queue="QUEUE.service2.tech-drive2.188100000059.thai.notification"
Paused  false
Persistent size 0
Prepared transaction message count  0
Purge on no consumers   false
Retroactive resource    false
Ring size   -1
Routing type    ANYCAST
Scheduled count 0
Scheduled size  0
Temporary   false
User    6e25e08b-9587-40a3-b7e9-146360539258

我使用 redistribution-delay0 的非通配符队列采用了只有 2 个节点的简化配置,并且我重现了您在我的本地计算机上看到的行为(即没有 Kubernetes)。我相信我明白 为什么 行为是这样的,但为了理解当前的行为,您首先必须首先了解重新分配的工作原理。

在集群中,每次创建消费者时,创建消费者的节点都会通知集群中的每个其他节点有关消费者的信息。如果集群中的其他节点在其相应队列中有消息但没有任何消费者,那么其他节点 将它们的消息重新分发 到具有消费者的节点(假设 message-load-balancingON_DEMANDredistribution-delay 是 >= 0).

然而,在您的情况下,当在另一个节点上创建消费者时,带有消息的节点实际上是 down,因此它实际上从未收到有关消费者的通知。因此,一旦该节点重新启动,它就不知道其他消费者,也不会重新分发其消息。

我看到你打开了 ARTEMIS-3321 to enhance the broker to deal with this situation. However, that will take time to develop and release (assuming the change is approved). My recommendation to you in the mean-time would be to configure your client reconnection which is discussed in the documentation,例如:

tcp://127.0.0.1:61616?reconnectAttempts=30

默认 retryInterval 2000 毫秒,这将使客户端最初连接到的代理有 1 分钟的时间恢复,然后客户端放弃尝试重新连接并抛出异常此时应用程序可以完全重新初始化它的连接,就像它现在正在做的那样。

由于您使用的是 Spring 引导,因此请务必使用 2.5.0 版,因为它包含 this change,这将允许您指定代理 URL 而不仅仅是主机和港口.

最后,请记住正常关闭节点会短路客户端的重新连接并触发您的应用程序重新初始化连接,这不是我们想要的这里。一定要粗暴地杀死节点(例如使用kill -9 <pid>)。