ActiveMQ Artemis 集群在一个实例崩溃后不重新分发消息
ActiveMQ Artemis cluster does not redistribute messages after one instance crash
我在 Kubernetes 中有一个 Artemis 集群,有 3 组 master/slave:
activemq-artemis-master-0 1/1 Running
activemq-artemis-master-1 1/1 Running
activemq-artemis-master-2 1/1 Running
activemq-artemis-slave-0 0/1 Running
activemq-artemis-slave-1 0/1 Running
activemq-artemis-slave-2 0/1 Running
我正在使用 Spring 引导 JmsListener 来使用发送到通配符队列的消息,如下所示。
@Component
@Log4j2
public class QueueListener {
@Autowired
private ListenerControl listenerControl;
@JmsListener(id = "queueListener0", destination = "QUEUE.service2.*.*.*.notification")
public void add(String message, @Header("sentBy") String sentBy, @Header("sentFrom") String sentFrom, @Header("sentAt") Long sentAt) throws InterruptedException {
log.info("---QUEUE[notification]: message={}, sentBy={}, sentFrom={}, sentAt={}",
message, sentBy, sentFrom, sentAt);
TimeUnit.MILLISECONDS.sleep(listenerControl.getDuration());
}
}
有 20 条消息发送到队列,master-1 是传递节点。当 5 条消息被消耗后,我杀死了 master-1 节点以模拟崩溃,我看到 slave-1 启动 运行 然后在 Kubernetes 重新生成它后返回到 master-1。侦听器抛出 JMSException
连接丢失并尝试重新连接。然后我看到它成功连接到 master-0(我看到创建的队列和消费者计数 > 0)。然而 master-0 上的队列是空的,而 master-1 中的同一个队列仍然有 15 条消息并且没有消费者附加到它。我等了一会儿,但 15 条消息从未送达。我不确定为什么重新分配没有启动。
master-1上的wildcard queue的属性是crash后重新上线时是这样的(我手动替换了accessToken字段的值,因为敏感信息):
Attribute Value
Acknowledge attempts 0
Address QUEUE.service2.*.*.*.notification
Configuration managed false
Consumer count 0
Consumers before dispatch 0
Dead letter address DLQ
Delay before dispatch -1
Delivering count 0
Delivering size 0
Durable true
Durable delivering count 0
Durable delivering size 0
Durable message count 15
Durable persistent size 47705
Durable scheduled count 0
Durable scheduled size 0
Enabled true
Exclusive false
Expiry address ExpiryQueue
Filter
First message age 523996
First message as json [{"JMSType":"service2","address":"QUEUE.service2.tech-drive2.188100000059.thai.notification","messageID":68026,"sentAt":1621957145988,"accessToken":"REMOVED","type":3,"priority":4,"userID":"ID:56c7b509-bd6f-11eb-a348-de0dacf99072","_AMQ_GROUP_ID":"tech-drive2-188100000059-thai","sentBy":"user@email.com","durable":true,"JMSReplyTo":"queue://QUEUE.service2.tech-drive2.188100000059.thai.notification","__AMQ_CID":"e4469ea3-bd62-11eb-a348-de0dacf99072","sentFrom":"service2","originalDestination":"QUEUE.service2.tech-drive2.188100000059.thai.notification","_AMQ_ROUTING_TYPE":1,"JMSCorrelationID":"c329c733-1170-440a-9080-992a009d87a9","expiration":0,"timestamp":1621957145988}]
First message timestamp 1621957145988
Group buckets -1
Group count 0
Group first key
Group rebalance false
Group rebalance pause dispatch false
Id 119
Last value false
Last value key
Max consumers -1
Message count 15
Messages acknowledged 0
Messages added 15
Messages expired 0
Messages killed 0
Name QUEUE.service2.*.*.*.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-1",component=addresses,address="QUEUE.service2.\*.\*.\*.notification",subcomponent=queues,routing-type="anycast",queue="QUEUE.service2.\*.\*.\*.notification"
Paused false
Persistent size 47705
Prepared transaction message count 0
Purge on no consumers false
Retroactive resource false
Ring size -1
Routing type ANYCAST
Scheduled count 0
Scheduled size 0
Temporary false
User f7bcdaed-8c0c-4bb5-ad03-ec06382cb557
master-0上通配符队列的属性是这样的:
Attribute Value
Acknowledge attempts 0
Address QUEUE.service2.*.*.*.notification
Configuration managed false
Consumer count 3
Consumers before dispatch 0
Dead letter address DLQ
Delay before dispatch -1
Delivering count 0
Delivering size 0
Durable true
Durable delivering count 0
Durable delivering size 0
Durable message count 0
Durable persistent size 0
Durable scheduled count 0
Durable scheduled size 0
Enabled true
Exclusive false
Expiry address ExpiryQueue
Filter
First message age
First message as json [{}]
First message timestamp
Group buckets -1
Group count 0
Group first key
Group rebalance false
Group rebalance pause dispatch false
Id 119
Last value false
Last value key
Max consumers -1
Message count 0
Messages acknowledged 0
Messages added 0
Messages expired 0
Messages killed 0
Name QUEUE.service2.*.*.*.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-0",component=addresses,address="QUEUE.service2.\*.\*.\*.notification",subcomponent=queues,routing-type="anycast",queue="QUEUE.service2.\*.\*.\*.notification"
Paused false
Persistent size 0
Prepared transaction message count 0
Purge on no consumers false
Retroactive resource false
Ring size -1
Routing type ANYCAST
Scheduled count 0
Scheduled size 0
Temporary false
User f7bcdaed-8c0c-4bb5-ad03-ec06382cb557
使用的Artemis版本是2.17.0。这是我在 master-0 broker.xml
中的集群配置。除了 connector-ref
已更改以匹配经纪人外,其他经纪人的配置相同:
<?xml version="1.0"?>
<configuration xmlns="urn:activemq" xmlns:xi="http://www.w3.org/2001/XInclude" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xsi:schemaLocation="urn:activemq:core ">
<name>activemq-artemis-master-0</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>ASYNCIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>100000</journal-buffer-timeout>
<journal-max-io>4096</journal-max-io>
<disk-scan-period>5000</disk-scan-period>
<max-disk-usage>90</max-disk-usage>
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>2244000</page-sync-timeout>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redistribution-delay>60000</redistribution-delay>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ"/>
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue"/>
</anycast>
</address>
</addresses>
<cluster-user>clusterUser</cluster-user>
<cluster-password>aShortclusterPassword</cluster-password>
<connectors>
<connector name="activemq-artemis-master-0">tcp://activemq-artemis-master-0.activemq-artemis-master.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-slave-0">tcp://activemq-artemis-slave-0.activemq-artemis-slave.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-master-1">tcp://activemq-artemis-master-1.activemq-artemis-master.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-slave-1">tcp://activemq-artemis-slave-1.activemq-artemis-slave.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-master-2">tcp://activemq-artemis-master-2.activemq-artemis-master.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-slave-2">tcp://activemq-artemis-slave-2.activemq-artemis-slave.svc.cluster.local:61616</connector>
</connectors>
<cluster-connections>
<cluster-connection name="activemq-artemis">
<connector-ref>activemq-artemis-master-0</connector-ref>
<retry-interval>500</retry-interval>
<retry-interval-multiplier>1.1</retry-interval-multiplier>
<max-retry-interval>5000</max-retry-interval>
<initial-connect-attempts>-1</initial-connect-attempts>
<reconnect-attempts>-1</reconnect-attempts>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<!-- scale-down>true</scale-down -->
<static-connectors>
<connector-ref>activemq-artemis-master-0</connector-ref>
<connector-ref>activemq-artemis-slave-0</connector-ref>
<connector-ref>activemq-artemis-master-1</connector-ref>
<connector-ref>activemq-artemis-slave-1</connector-ref>
<connector-ref>activemq-artemis-master-2</connector-ref>
<connector-ref>activemq-artemis-slave-2</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<ha-policy>
<replication>
<master>
<group-name>activemq-artemis-0</group-name>
<quorum-vote-wait>12</quorum-vote-wait>
<vote-on-replication-failure>true</vote-on-replication-failure>
<!--we need this for auto failback-->
<check-for-live-server>true</check-for-live-server>
</master>
</replication>
</ha-policy>
</core>
<core xmlns="urn:activemq:core">
<jmx-management-enabled>true</jmx-management-enabled>
</core>
</configuration>
从 Stack Overflow 的另一个回答中,我了解到我的高可用性拓扑是多余的,我打算删除从站。但是,我不认为奴隶是重新分配消息不起作用的原因。是否缺少处理 Artemis 节点崩溃的配置?
更新 1:
正如 Justin 所建议的,我尝试在没有 HA 的情况下使用 Artemis 的 2 个节点的集群。
activemq-artemis-master-0 1/1 Running 0 27m
activemq-artemis-master-1 1/1 Running 0 74s
下面是2个artemis节点的broker.xml。它们之间唯一的区别是节点名称和日志缓冲区超时:
<?xml version="1.0"?>
<configuration xmlns="urn:activemq" xmlns:xi="http://www.w3.org/2001/XInclude" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xsi:schemaLocation="urn:activemq:core ">
<name>activemq-artemis-master-0</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>ASYNCIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>100000</journal-buffer-timeout>
<journal-max-io>4096</journal-max-io>
<disk-scan-period>5000</disk-scan-period>
<max-disk-usage>90</max-disk-usage>
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>2244000</page-sync-timeout>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<cluster-user>ClusterUser</cluster-user>
<cluster-password>longClusterPassword</cluster-password>
<connectors>
<connector name="activemq-artemis-master-0">tcp://activemq-artemis-master-0.activemq-artemis-master.ncp-stack-testing.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-master-1">tcp://activemq-artemis-master-1.activemq-artemis-master.ncp-stack-testing.svc.cluster.local:61616</connector>
</connectors>
<cluster-connections>
<cluster-connection name="activemq-artemis">
<connector-ref>activemq-artemis-master-0</connector-ref>
<retry-interval>500</retry-interval>
<retry-interval-multiplier>1.1</retry-interval-multiplier>
<max-retry-interval>5000</max-retry-interval>
<initial-connect-attempts>-1</initial-connect-attempts>
<reconnect-attempts>-1</reconnect-attempts>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors>
<connector-ref>activemq-artemis-master-0</connector-ref>
<connector-ref>activemq-artemis-master-1</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<address-settings>
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redistribution-delay>60000</redistribution-delay>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ"/>
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue"/>
</anycast>
</address>
</addresses>
</core>
<core xmlns="urn:activemq:core">
<jmx-management-enabled>true</jmx-management-enabled>
</core>
</configuration>
使用此设置,我仍然得到相同的结果,在 artemis 节点崩溃和恢复后,遗留的消息没有移动到另一个节点。
更新2
我尝试按照贾斯汀的建议使用非通配符队列,但仍然有相同的行为。我注意到的一个不同是,如果我使用非通配符队列,与通配符情况下的 3 相比,消费者计数仅为 1 queue.Here 是崩溃后旧队列的属性
Acknowledge attempts 0
Address QUEUE.service2.tech-drive2.188100000059.thai.notification
Configuration managed false
Consumer count 0
Consumers before dispatch 0
Dead letter address DLQ
Delay before dispatch -1
Delivering count 0
Delivering size 0
Durable true
Durable delivering count 0
Durable delivering size 0
Durable message count 15
Durable persistent size 102245
Durable scheduled count 0
Durable scheduled size 0
Enabled true
Exclusive false
Expiry address ExpiryQueue
Filter
First message age 840031
First message as json [{"JMSType":"service2","address":"QUEUE.service2.tech-drive2.188100000059.thai.notification","messageID":8739,"sentAt":1621969900922,"accessToken":"DONOTDISPLAY","type":3,"priority":4,"userID":"ID:09502dc0-bd8d-11eb-b75c-c6609f1332c9","_AMQ_GROUP_ID":"tech-drive2-188100000059-thai","sentBy":"user@email.com","durable":true,"JMSReplyTo":"queue://QUEUE.service2.tech-drive2.188100000059.thai.notification","__AMQ_CID":"c292b418-bd8b-11eb-b75c-c6609f1332c9","sentFrom":"service2","originalDestination":"QUEUE.service2.tech-drive2.188100000059.thai.notification","_AMQ_ROUTING_TYPE":1,"JMSCorrelationID":"90b783d0-d9cc-4188-9c9e-3453786b2105","expiration":0,"timestamp":1621969900922}]
First message timestamp 1621969900922
Group buckets -1
Group count 0
Group first key
Group rebalance false
Group rebalance pause dispatch false
Id 606
Last value false
Last value key
Max consumers -1
Message count 15
Messages acknowledged 0
Messages added 15
Messages expired 0
Messages killed 0
Name QUEUE.service2.tech-drive2.188100000059.thai.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-0",component=addresses,address="QUEUE.service2.tech-drive2.188100000059.thai.notification",subcomponent=queues,routing-type="anycast",queue="QUEUE.service2.tech-drive2.188100000059.thai.notification"
Paused false
Persistent size 102245
Prepared transaction message count 0
Purge on no consumers false
Retroactive resource false
Ring size -1
Routing type ANYCAST
Scheduled count 0
Scheduled size 0
Temporary false
User 6e25e08b-9587-40a3-b7e9-146360539258
这里是新队列的属性
Attribute Value
Acknowledge attempts 0
Address QUEUE.service2.tech-drive2.188100000059.thai.notification
Configuration managed false
Consumer count 1
Consumers before dispatch 0
Dead letter address DLQ
Delay before dispatch -1
Delivering count 0
Delivering size 0
Durable true
Durable delivering count 0
Durable delivering size 0
Durable message count 0
Durable persistent size 0
Durable scheduled count 0
Durable scheduled size 0
Enabled true
Exclusive false
Expiry address ExpiryQueue
Filter
First message age
First message as json [{}]
First message timestamp
Group buckets -1
Group count 0
Group first key
Group rebalance false
Group rebalance pause dispatch false
Id 866
Last value false
Last value key
Max consumers -1
Message count 0
Messages acknowledged 0
Messages added 0
Messages expired 0
Messages killed 0
Name QUEUE.service2.tech-drive2.188100000059.thai.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-1",component=addresses,address="QUEUE.service2.tech-drive2.188100000059.thai.notification",subcomponent=queues,routing-type="anycast",queue="QUEUE.service2.tech-drive2.188100000059.thai.notification"
Paused false
Persistent size 0
Prepared transaction message count 0
Purge on no consumers false
Retroactive resource false
Ring size -1
Routing type ANYCAST
Scheduled count 0
Scheduled size 0
Temporary false
User 6e25e08b-9587-40a3-b7e9-146360539258
我使用 redistribution-delay
或 0
的非通配符队列采用了只有 2 个节点的简化配置,并且我重现了您在我的本地计算机上看到的行为(即没有 Kubernetes)。我相信我明白 为什么 行为是这样的,但为了理解当前的行为,您首先必须首先了解重新分配的工作原理。
在集群中,每次创建消费者时,创建消费者的节点都会通知集群中的每个其他节点有关消费者的信息。如果集群中的其他节点在其相应队列中有消息但没有任何消费者,那么其他节点 将它们的消息重新分发 到具有消费者的节点(假设 message-load-balancing
是 ON_DEMAND
而 redistribution-delay
是 >= 0
).
然而,在您的情况下,当在另一个节点上创建消费者时,带有消息的节点实际上是 down,因此它实际上从未收到有关消费者的通知。因此,一旦该节点重新启动,它就不知道其他消费者,也不会重新分发其消息。
我看到你打开了 ARTEMIS-3321 to enhance the broker to deal with this situation. However, that will take time to develop and release (assuming the change is approved). My recommendation to you in the mean-time would be to configure your client reconnection which is discussed in the documentation,例如:
tcp://127.0.0.1:61616?reconnectAttempts=30
默认 retryInterval
2000
毫秒,这将使客户端最初连接到的代理有 1 分钟的时间恢复,然后客户端放弃尝试重新连接并抛出异常此时应用程序可以完全重新初始化它的连接,就像它现在正在做的那样。
由于您使用的是 Spring 引导,因此请务必使用 2.5.0 版,因为它包含 this change,这将允许您指定代理 URL 而不仅仅是主机和港口.
最后,请记住正常关闭节点会短路客户端的重新连接并触发您的应用程序重新初始化连接,这不是我们想要的这里。一定要粗暴地杀死节点(例如使用kill -9 <pid>
)。
我在 Kubernetes 中有一个 Artemis 集群,有 3 组 master/slave:
activemq-artemis-master-0 1/1 Running
activemq-artemis-master-1 1/1 Running
activemq-artemis-master-2 1/1 Running
activemq-artemis-slave-0 0/1 Running
activemq-artemis-slave-1 0/1 Running
activemq-artemis-slave-2 0/1 Running
我正在使用 Spring 引导 JmsListener 来使用发送到通配符队列的消息,如下所示。
@Component
@Log4j2
public class QueueListener {
@Autowired
private ListenerControl listenerControl;
@JmsListener(id = "queueListener0", destination = "QUEUE.service2.*.*.*.notification")
public void add(String message, @Header("sentBy") String sentBy, @Header("sentFrom") String sentFrom, @Header("sentAt") Long sentAt) throws InterruptedException {
log.info("---QUEUE[notification]: message={}, sentBy={}, sentFrom={}, sentAt={}",
message, sentBy, sentFrom, sentAt);
TimeUnit.MILLISECONDS.sleep(listenerControl.getDuration());
}
}
有 20 条消息发送到队列,master-1 是传递节点。当 5 条消息被消耗后,我杀死了 master-1 节点以模拟崩溃,我看到 slave-1 启动 运行 然后在 Kubernetes 重新生成它后返回到 master-1。侦听器抛出 JMSException
连接丢失并尝试重新连接。然后我看到它成功连接到 master-0(我看到创建的队列和消费者计数 > 0)。然而 master-0 上的队列是空的,而 master-1 中的同一个队列仍然有 15 条消息并且没有消费者附加到它。我等了一会儿,但 15 条消息从未送达。我不确定为什么重新分配没有启动。
master-1上的wildcard queue的属性是crash后重新上线时是这样的(我手动替换了accessToken字段的值,因为敏感信息):
Attribute Value
Acknowledge attempts 0
Address QUEUE.service2.*.*.*.notification
Configuration managed false
Consumer count 0
Consumers before dispatch 0
Dead letter address DLQ
Delay before dispatch -1
Delivering count 0
Delivering size 0
Durable true
Durable delivering count 0
Durable delivering size 0
Durable message count 15
Durable persistent size 47705
Durable scheduled count 0
Durable scheduled size 0
Enabled true
Exclusive false
Expiry address ExpiryQueue
Filter
First message age 523996
First message as json [{"JMSType":"service2","address":"QUEUE.service2.tech-drive2.188100000059.thai.notification","messageID":68026,"sentAt":1621957145988,"accessToken":"REMOVED","type":3,"priority":4,"userID":"ID:56c7b509-bd6f-11eb-a348-de0dacf99072","_AMQ_GROUP_ID":"tech-drive2-188100000059-thai","sentBy":"user@email.com","durable":true,"JMSReplyTo":"queue://QUEUE.service2.tech-drive2.188100000059.thai.notification","__AMQ_CID":"e4469ea3-bd62-11eb-a348-de0dacf99072","sentFrom":"service2","originalDestination":"QUEUE.service2.tech-drive2.188100000059.thai.notification","_AMQ_ROUTING_TYPE":1,"JMSCorrelationID":"c329c733-1170-440a-9080-992a009d87a9","expiration":0,"timestamp":1621957145988}]
First message timestamp 1621957145988
Group buckets -1
Group count 0
Group first key
Group rebalance false
Group rebalance pause dispatch false
Id 119
Last value false
Last value key
Max consumers -1
Message count 15
Messages acknowledged 0
Messages added 15
Messages expired 0
Messages killed 0
Name QUEUE.service2.*.*.*.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-1",component=addresses,address="QUEUE.service2.\*.\*.\*.notification",subcomponent=queues,routing-type="anycast",queue="QUEUE.service2.\*.\*.\*.notification"
Paused false
Persistent size 47705
Prepared transaction message count 0
Purge on no consumers false
Retroactive resource false
Ring size -1
Routing type ANYCAST
Scheduled count 0
Scheduled size 0
Temporary false
User f7bcdaed-8c0c-4bb5-ad03-ec06382cb557
master-0上通配符队列的属性是这样的:
Attribute Value
Acknowledge attempts 0
Address QUEUE.service2.*.*.*.notification
Configuration managed false
Consumer count 3
Consumers before dispatch 0
Dead letter address DLQ
Delay before dispatch -1
Delivering count 0
Delivering size 0
Durable true
Durable delivering count 0
Durable delivering size 0
Durable message count 0
Durable persistent size 0
Durable scheduled count 0
Durable scheduled size 0
Enabled true
Exclusive false
Expiry address ExpiryQueue
Filter
First message age
First message as json [{}]
First message timestamp
Group buckets -1
Group count 0
Group first key
Group rebalance false
Group rebalance pause dispatch false
Id 119
Last value false
Last value key
Max consumers -1
Message count 0
Messages acknowledged 0
Messages added 0
Messages expired 0
Messages killed 0
Name QUEUE.service2.*.*.*.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-0",component=addresses,address="QUEUE.service2.\*.\*.\*.notification",subcomponent=queues,routing-type="anycast",queue="QUEUE.service2.\*.\*.\*.notification"
Paused false
Persistent size 0
Prepared transaction message count 0
Purge on no consumers false
Retroactive resource false
Ring size -1
Routing type ANYCAST
Scheduled count 0
Scheduled size 0
Temporary false
User f7bcdaed-8c0c-4bb5-ad03-ec06382cb557
使用的Artemis版本是2.17.0。这是我在 master-0 broker.xml
中的集群配置。除了 connector-ref
已更改以匹配经纪人外,其他经纪人的配置相同:
<?xml version="1.0"?>
<configuration xmlns="urn:activemq" xmlns:xi="http://www.w3.org/2001/XInclude" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xsi:schemaLocation="urn:activemq:core ">
<name>activemq-artemis-master-0</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>ASYNCIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>100000</journal-buffer-timeout>
<journal-max-io>4096</journal-max-io>
<disk-scan-period>5000</disk-scan-period>
<max-disk-usage>90</max-disk-usage>
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>2244000</page-sync-timeout>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redistribution-delay>60000</redistribution-delay>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ"/>
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue"/>
</anycast>
</address>
</addresses>
<cluster-user>clusterUser</cluster-user>
<cluster-password>aShortclusterPassword</cluster-password>
<connectors>
<connector name="activemq-artemis-master-0">tcp://activemq-artemis-master-0.activemq-artemis-master.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-slave-0">tcp://activemq-artemis-slave-0.activemq-artemis-slave.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-master-1">tcp://activemq-artemis-master-1.activemq-artemis-master.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-slave-1">tcp://activemq-artemis-slave-1.activemq-artemis-slave.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-master-2">tcp://activemq-artemis-master-2.activemq-artemis-master.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-slave-2">tcp://activemq-artemis-slave-2.activemq-artemis-slave.svc.cluster.local:61616</connector>
</connectors>
<cluster-connections>
<cluster-connection name="activemq-artemis">
<connector-ref>activemq-artemis-master-0</connector-ref>
<retry-interval>500</retry-interval>
<retry-interval-multiplier>1.1</retry-interval-multiplier>
<max-retry-interval>5000</max-retry-interval>
<initial-connect-attempts>-1</initial-connect-attempts>
<reconnect-attempts>-1</reconnect-attempts>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<!-- scale-down>true</scale-down -->
<static-connectors>
<connector-ref>activemq-artemis-master-0</connector-ref>
<connector-ref>activemq-artemis-slave-0</connector-ref>
<connector-ref>activemq-artemis-master-1</connector-ref>
<connector-ref>activemq-artemis-slave-1</connector-ref>
<connector-ref>activemq-artemis-master-2</connector-ref>
<connector-ref>activemq-artemis-slave-2</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<ha-policy>
<replication>
<master>
<group-name>activemq-artemis-0</group-name>
<quorum-vote-wait>12</quorum-vote-wait>
<vote-on-replication-failure>true</vote-on-replication-failure>
<!--we need this for auto failback-->
<check-for-live-server>true</check-for-live-server>
</master>
</replication>
</ha-policy>
</core>
<core xmlns="urn:activemq:core">
<jmx-management-enabled>true</jmx-management-enabled>
</core>
</configuration>
从 Stack Overflow 的另一个回答中,我了解到我的高可用性拓扑是多余的,我打算删除从站。但是,我不认为奴隶是重新分配消息不起作用的原因。是否缺少处理 Artemis 节点崩溃的配置?
更新 1: 正如 Justin 所建议的,我尝试在没有 HA 的情况下使用 Artemis 的 2 个节点的集群。
activemq-artemis-master-0 1/1 Running 0 27m
activemq-artemis-master-1 1/1 Running 0 74s
下面是2个artemis节点的broker.xml。它们之间唯一的区别是节点名称和日志缓冲区超时:
<?xml version="1.0"?>
<configuration xmlns="urn:activemq" xmlns:xi="http://www.w3.org/2001/XInclude" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xsi:schemaLocation="urn:activemq:core ">
<name>activemq-artemis-master-0</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>ASYNCIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>100000</journal-buffer-timeout>
<journal-max-io>4096</journal-max-io>
<disk-scan-period>5000</disk-scan-period>
<max-disk-usage>90</max-disk-usage>
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>2244000</page-sync-timeout>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<cluster-user>ClusterUser</cluster-user>
<cluster-password>longClusterPassword</cluster-password>
<connectors>
<connector name="activemq-artemis-master-0">tcp://activemq-artemis-master-0.activemq-artemis-master.ncp-stack-testing.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-master-1">tcp://activemq-artemis-master-1.activemq-artemis-master.ncp-stack-testing.svc.cluster.local:61616</connector>
</connectors>
<cluster-connections>
<cluster-connection name="activemq-artemis">
<connector-ref>activemq-artemis-master-0</connector-ref>
<retry-interval>500</retry-interval>
<retry-interval-multiplier>1.1</retry-interval-multiplier>
<max-retry-interval>5000</max-retry-interval>
<initial-connect-attempts>-1</initial-connect-attempts>
<reconnect-attempts>-1</reconnect-attempts>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors>
<connector-ref>activemq-artemis-master-0</connector-ref>
<connector-ref>activemq-artemis-master-1</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<address-settings>
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redistribution-delay>60000</redistribution-delay>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ"/>
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue"/>
</anycast>
</address>
</addresses>
</core>
<core xmlns="urn:activemq:core">
<jmx-management-enabled>true</jmx-management-enabled>
</core>
</configuration>
使用此设置,我仍然得到相同的结果,在 artemis 节点崩溃和恢复后,遗留的消息没有移动到另一个节点。
更新2 我尝试按照贾斯汀的建议使用非通配符队列,但仍然有相同的行为。我注意到的一个不同是,如果我使用非通配符队列,与通配符情况下的 3 相比,消费者计数仅为 1 queue.Here 是崩溃后旧队列的属性
Acknowledge attempts 0
Address QUEUE.service2.tech-drive2.188100000059.thai.notification
Configuration managed false
Consumer count 0
Consumers before dispatch 0
Dead letter address DLQ
Delay before dispatch -1
Delivering count 0
Delivering size 0
Durable true
Durable delivering count 0
Durable delivering size 0
Durable message count 15
Durable persistent size 102245
Durable scheduled count 0
Durable scheduled size 0
Enabled true
Exclusive false
Expiry address ExpiryQueue
Filter
First message age 840031
First message as json [{"JMSType":"service2","address":"QUEUE.service2.tech-drive2.188100000059.thai.notification","messageID":8739,"sentAt":1621969900922,"accessToken":"DONOTDISPLAY","type":3,"priority":4,"userID":"ID:09502dc0-bd8d-11eb-b75c-c6609f1332c9","_AMQ_GROUP_ID":"tech-drive2-188100000059-thai","sentBy":"user@email.com","durable":true,"JMSReplyTo":"queue://QUEUE.service2.tech-drive2.188100000059.thai.notification","__AMQ_CID":"c292b418-bd8b-11eb-b75c-c6609f1332c9","sentFrom":"service2","originalDestination":"QUEUE.service2.tech-drive2.188100000059.thai.notification","_AMQ_ROUTING_TYPE":1,"JMSCorrelationID":"90b783d0-d9cc-4188-9c9e-3453786b2105","expiration":0,"timestamp":1621969900922}]
First message timestamp 1621969900922
Group buckets -1
Group count 0
Group first key
Group rebalance false
Group rebalance pause dispatch false
Id 606
Last value false
Last value key
Max consumers -1
Message count 15
Messages acknowledged 0
Messages added 15
Messages expired 0
Messages killed 0
Name QUEUE.service2.tech-drive2.188100000059.thai.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-0",component=addresses,address="QUEUE.service2.tech-drive2.188100000059.thai.notification",subcomponent=queues,routing-type="anycast",queue="QUEUE.service2.tech-drive2.188100000059.thai.notification"
Paused false
Persistent size 102245
Prepared transaction message count 0
Purge on no consumers false
Retroactive resource false
Ring size -1
Routing type ANYCAST
Scheduled count 0
Scheduled size 0
Temporary false
User 6e25e08b-9587-40a3-b7e9-146360539258
这里是新队列的属性
Attribute Value
Acknowledge attempts 0
Address QUEUE.service2.tech-drive2.188100000059.thai.notification
Configuration managed false
Consumer count 1
Consumers before dispatch 0
Dead letter address DLQ
Delay before dispatch -1
Delivering count 0
Delivering size 0
Durable true
Durable delivering count 0
Durable delivering size 0
Durable message count 0
Durable persistent size 0
Durable scheduled count 0
Durable scheduled size 0
Enabled true
Exclusive false
Expiry address ExpiryQueue
Filter
First message age
First message as json [{}]
First message timestamp
Group buckets -1
Group count 0
Group first key
Group rebalance false
Group rebalance pause dispatch false
Id 866
Last value false
Last value key
Max consumers -1
Message count 0
Messages acknowledged 0
Messages added 0
Messages expired 0
Messages killed 0
Name QUEUE.service2.tech-drive2.188100000059.thai.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-1",component=addresses,address="QUEUE.service2.tech-drive2.188100000059.thai.notification",subcomponent=queues,routing-type="anycast",queue="QUEUE.service2.tech-drive2.188100000059.thai.notification"
Paused false
Persistent size 0
Prepared transaction message count 0
Purge on no consumers false
Retroactive resource false
Ring size -1
Routing type ANYCAST
Scheduled count 0
Scheduled size 0
Temporary false
User 6e25e08b-9587-40a3-b7e9-146360539258
我使用 redistribution-delay
或 0
的非通配符队列采用了只有 2 个节点的简化配置,并且我重现了您在我的本地计算机上看到的行为(即没有 Kubernetes)。我相信我明白 为什么 行为是这样的,但为了理解当前的行为,您首先必须首先了解重新分配的工作原理。
在集群中,每次创建消费者时,创建消费者的节点都会通知集群中的每个其他节点有关消费者的信息。如果集群中的其他节点在其相应队列中有消息但没有任何消费者,那么其他节点 将它们的消息重新分发 到具有消费者的节点(假设 message-load-balancing
是 ON_DEMAND
而 redistribution-delay
是 >= 0
).
然而,在您的情况下,当在另一个节点上创建消费者时,带有消息的节点实际上是 down,因此它实际上从未收到有关消费者的通知。因此,一旦该节点重新启动,它就不知道其他消费者,也不会重新分发其消息。
我看到你打开了 ARTEMIS-3321 to enhance the broker to deal with this situation. However, that will take time to develop and release (assuming the change is approved). My recommendation to you in the mean-time would be to configure your client reconnection which is discussed in the documentation,例如:
tcp://127.0.0.1:61616?reconnectAttempts=30
默认 retryInterval
2000
毫秒,这将使客户端最初连接到的代理有 1 分钟的时间恢复,然后客户端放弃尝试重新连接并抛出异常此时应用程序可以完全重新初始化它的连接,就像它现在正在做的那样。
由于您使用的是 Spring 引导,因此请务必使用 2.5.0 版,因为它包含 this change,这将允许您指定代理 URL 而不仅仅是主机和港口.
最后,请记住正常关闭节点会短路客户端的重新连接并触发您的应用程序重新初始化连接,这不是我们想要的这里。一定要粗暴地杀死节点(例如使用kill -9 <pid>
)。