使用 RabbitMQ 根据键(例如 id)确定并发性

Determine concurrency based on a key (such as an id) with RabbitMQ

我们正在使用 RabbitMQ 和 Spring 的侦听器容器构建 Web 应用程序以产生并发性,如下所示:

<rabbit:listener-container connection-factory="connectionFactory" concurrency="10">
      <rabbit:listener ref="FooService" method="handleFoo" queue-names="fooQueue"/>
</rabbit:listener-container>

<rabbit:topic-exchange name="exchange">
    <rabbit:bindings>
        <rabbit:binding queue="fooQueue" pattern="foo.handle"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

我希望侦听器并发处理消息(例如,本例中有 10 个线程),但我不希望它们同时处理具有相同数据的消息。例如,如果我发送 Foo 对象的 id,我只希望同时处理不同的 Foo 对象,但应该顺序处理相同的 Foo 对象。

我已经查看了 RabbitMQ 的交换和队列类型,但无法想出如何使用它们中的任何一个。

我能想到的一种方法是创建多个具有不同模式的队列,例如 foo.handle.1foo.handle.2 等等。然后将 Foo 对象的 id 哈希到这些模式。但是,对我们拥有的每种类型的队列执行此操作并管理所有队列很容易失控。

是否有一种机制可以使用 RabbitMQ 实现此目的?

与 JMS 不同,RabbitMQ(或 AMQP 本身)没有消息选择器的概念 - 您无法从队列中提取选择性消息。

RabbitMQ 的唯一解决方案是为每种类型创建一个单独的队列,并在每个队列上使用一个消费者。