在 Apache Artemis 中为新主题订阅者提供旧消息

Prime new topic subscribers with old messages in Apache Artemis

我正在配置 Apache Artemis 消息代理。代理将接受大文件,下游消费者访问主题以处理最新文件。现在我想知道如何使最新文件可用于开发运行。由于消息一天只到达几次,因此测试运行需要访问最后几条已发送的消息而不能等待下一条。

对于生产和暂存系统,我发现持久订阅工作正常。我已经改编了 Apache Camel 配置作为说明。这里有两个接收消息的消费者,每个消费者都使用持久订阅:

<route id="inbox">
    <from uri="file:inbox"/>
    <to uri="activemq:topic:testing"/>
</route>

<route id="outbox-staging">
    <from uri="activemq:topic:testing?clientId=staging&amp;durableSubscriptionName=staging"/>
    <to uri="file:outbox-staging"/>
</route>
<route id="outbox-production">
    <from uri="activemq:topic:testing?clientId=production&amp;durableSubscriptionName=production"/>
    <to uri="file:outbox-production"/>
</route>

这很好。如果消费者处于离线状态,它将在重新在线时接收消息。现在如果另一个消费者加入测试;

<route id="outbox-testing" streamCache="true">
    <from uri="activemq:topic:testing?clientId=my-local-consumer&amp;durableSubscriptionName=my-local-consumer"/>
    <to uri="file:outbox-local"/>
</route>

因为订阅之前不存在,消费者将不得不等待新消息。我正在寻找的是新订阅者可以立即收到可用消息。我发现了这个概念的不同名称,例如 prefetchPolicyconsumerWindowSize 或“retroactive consumer”。但我不清楚哪些术语适用于 Apache Artemis 以及如何设置它们,因为这些示例主要是指 Apache ActiveMQ。

如何配置 Artemis 以便加入新订阅的消费者获得过去的消息?

prefetchPolicy 不适用于 ActiveMQ Artemis。适用于 ActiveMQ 5.x.

consumerWindowSize 确实适用于 ActiveMQ Artemis。

然而,prefetchPolicyconsumerWindowSize 都不适用于这种情况,因为它们都与 "flow control" 相关并且与将 "missed" 消息放到JMS 主题订阅。

"retroactive consumer" 功能适用于 ActiveMQ 5.x。类似的功能(称为 "retroactive address")将在 ActiveMQ Artemis 2.11 中可用。它是作为 ARTEMIS-2504 的一部分实现的。

因此您有几个选择:

  1. 等待ActiveMQ Artemis 2.11 发布(应该在一月份发布)。
  2. 基于包含追溯地址功能的 master 分支构建您自己的 ActiveMQ Artemis 版本。
  3. 修改您的测试环境,使新订阅者不必等待消息太久(例如更频繁地发送消息)。