ActiveMQ Last Value Queue 未按预期运行

ActiveMQ Last Value Queue not acting as expected

我正在使用 ActiveMQ Artemis 2.19.1。我使用 Spring Boot 创建了生产者和消费者应用程序。我需要消费者的多个实例来接收所有消息(多播)。我像这样配置了一个最后值队列 (broker.xml):

<address-settings>
   <address-setting match="quote.#">
      <max-size-bytes>1000000000</max-size-bytes> <!-- 1GB -->
      <address-full-policy>BLOCK</address-full-policy>
      <default-last-value-key>symbol</default-last-value-key>
      <default-last-value-queue>true</default-last-value-queue>
      <default-non-destructive>true</default-non-destructive>
   </address-setting>
   ...
</address-settings>

发送是这样的,似乎工作正常。 “符号”是 VLQ 键。

import org.springframework.jms.core.JmsTemplate;

@Service
public class DispatcherService {

@Autowired
JmsTemplate jmsTemplate;

public void sendMessageA(String message) {
    jmsTemplate.convertAndSend(jmsQueue, message, m-> {
        m.setStringProperty("symbol", "ABC");
        return m;
    });
}

如果Spring启动applicaiton.properties有:

spring.jms.pub-sub-domain=true

...然后所有客户端在发布时都会收到所有消息(好)。但是,当新客户启动并订阅该主题时,不会向他们发布最新消息。

如果改为使用:

spring.jms.pub-sub-domain=false

我可以看到最后一条消息保留在最后一个值队列中(良好)并且正在连接的消费者获取最后一条消息。但是,随着消息的发布,它们会循环分发(任播),而不是所有消息都分发给所有消费者。

我如何确保连接到 LVQ 的客户端收到的是最新消息,然后是所有未来消息,而不仅仅是未来消息的循环分发?

编辑:

这样做有效。只需要留下spring.jms.pub-sub-domain=true并设置retroactive-message-count大于可能遇到的符号数否则有些不会保留:

<address-setting match="quotes">
  <retroactive-message-count>100000</retroactive-message-count>
</address-setting>

<address-setting match="*.*.*.quotes.*.retro">
  <default-last-value-key>symbol</default-last-value-key>
</address-setting>

在我看来一切都在按设计工作。我相信您的期望受到挫败,因为您正在使用 pub/sub(即 JMS 主题)。

让我提供一些背景知识。当 JMS 客户端创建对主题的订阅时,代理通过在具有相同名称的地址上创建多播队列来响应。队列根据订阅的类型命名。如果它是 non-durable 订阅,则队列以 UUID 命名。如果它是持久订阅,则队列根据客户端提供的订阅名称和客户端 ID(如果可用)命名。当一条消息被发送到该地址时,它被放入绑定到该地址的所有多播队列中。

因此,当创建新的 non-durable 订阅时,也会为该订阅创建一个 新队列 ,这意味着订阅者将收到 none 在创建订阅之前发送到主题的消息。这是 JMS 主题的预期行为(即正常的 pub/sub 语义)。此外,由于 non-durable 订阅的队列仅在订阅者已连接时 可用 这意味着无法强制执行 LVQ 语义,因为到达队列的任何消息都将是立即发送给消费者。简而言之,LVQ 与 JMS 主题没有多大意义。

当您使用 JMS 队列时行为会发生变化,因为队列总是在那里接收消息。消费者可以随心所欲地进出,而经纪人强制执行 LVQ 语义。

一个可能的解决方案是创建一个特殊的“初始化”队列,消费者可以在其中最初连接以获取最新信息,之后他们可以订阅 JMS 主题以获取您需要的 pub/sub 语义。您可以使用 divert 使发送消息的应用程序透明,以便它们可以继续发送到 JMS 主题。这是示例配置:

<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
   <core xmlns="urn:activemq:core">

      ...      

      <diverts>
         <divert name="myDivert">
            <address>myTopic</address>
            <forwarding-address>initQueue</forwarding-address>
            <exclusive>false</exclusive>
         </divert>
      </diverts>

      ...

      <addresses>
         <address name="myTopic">
            <multicast/>
         </address>
         <address name="initQueue">
            <anycast>
               <queue name="initQueue" last-value-key="symbol" non-destructive="true" />
            </anycast>
         </address>
         ...
      </addresses>
   </core>
</configuration>

使用此配置,发送到 JMS 主题 myTopic 的每条消息也将透明地发送到 initQueue。该队列将仅保留最多 up-to-date 条消息,因为它使用 last-value 语义。此外,那些 up-to-date 消息将 留在队列中 以供任何后续消费者使用,因为队列是 non-destructive.

我预计这里唯一的困难是 Spring,它可能无法为您提供创建初始队列使用者然后创建主题订阅者的灵活性。如果你直接使用JMS API 这将是一件相对简单的事情。

另一个可能的解决方案是使用 retroactive addresses。这里要做的主要事情是确保内部环队列是 LVQ。您可以使用 default-last-value-key address-setting 来做到这一点。请参阅有关要使用的 match 的文档详细信息。