ActiveMQ 中基于内容的消费者优先级

Content-based consumer priority in ActiveMQ

我正在与需要加载大量资源来处理消息内容的消费者合作。同一系列中的消息需要相同的资源。

我尝试使用消息分组来确保将同一系列中的消息提供给已加载资源的使用者。但是我希望消费者优先处理同一系列中的消息,而不是排他性(加载资源需要时间,处理消息需要更长的时间)。例如,如果消费者 1 和 2 可用,则 ID 为 group1 的消息应始终提供给消费者 1,ID 为 group2 的消息应始终提供给消费者 2。如果只有消费者 2 可用,它应该消耗任何消息。这样,消费者 1 对消息 group1 具有优先权,但不是排他性的。同样,消费者 2 对消息具有优先权 group2,但不是独占性。

有没有办法让 AMQ 中的消息分组不给消费者排他性?或者根据内容动态设置消费者优先级的方法?

我正在使用 ActiveMQ 5.16.2,但如果需要的话,我可以切换到 Artemis。

消息分组的重点是序列化消息的消费。最简单的方法是确保组中的所有消息都发送给同一个消费者。因此,消息分组不可能不给予消费者排他性,因为这首先会破坏分组的目的。也就是说,单个消费者 可以 同时接收来自多个组的消息。当组多于消费者时会发生这种情况。

还值得注意的是,您无法控制哪个消费者消费哪个组。分组的要点只是 one 消费者从组中获取消息,而不是 which one.

消费者优先级由消费者自己设置,如the documentation所述。它不是基于正在发送的消息的内容。

您可以尝试在消息上设置 属性 以反映他们属于哪个“家庭”,然后您的消费者可以使用选择器来消费他们关心的消息。

请记住,您的客户端应用程序中始终可以有多个使用者。每个客户端可能有两个消费者——一个带有选择器的高优先级消费者和一个没有选择器的低优先级消费者。代理会优先考虑带有选择器的消费者,但如果没有匹配,它将向没有选择器的消费者发送消息。

在此用例中,我建议过滤消息组。我认为您正在寻求对消息流进行分区,其中消息组提供的 竞争消费者 模式不能满足您的需要。如果您要求这些流由特定的消费者处理,那么您最终希望该消费者拥有 HA 备用或并行消费者是很实际的——这将是一个额外的驱动程序来分区或过滤流量。

有两种方法——您可以使用虚拟目标(通过具有过滤功能的 CompositeQueue 等)进行服务器驱动的配置,或者使用虚拟主题和选择器进行客户端驱动的配置。

这是客户端驱动的方法。

  1. 发布到虚拟主题(即 topic://VT.ORDER.EVENT)
  2. 从虚拟消费者队列消费(即 queue://VQ.APP1.VT.ORDER.EVENT、queue://VQ.APP2.VT.ORDER.EVENT)
  3. 让消费者连接选择器

每组消息都会到达自己的队列,每个队列可以有 1 .. n 个消费者。

奖励:您仍然可以让消费者通过注册通配符选择器或直接订阅主题来监控完整的流量。

<destinationInterceptors> 
  <virtualDestinationInterceptor> 
    <virtualDestinations> 
      <virtualTopic name="VT.>" prefix="VQ.*." selectorAware="true"/>   
    </virtualDestinations>
  </virtualDestinationInterceptor> 
</destinationInterceptors>

坚持选择器:

<plugins>
  <virtualSelectorCacheBrokerPlugin persistFile="<some path>${activemq.data}/virtual-topic-selectorcache.data" />
</plugins>

参考:Virtual Topics