Spring 带有预取的 AMQP 单一消费者并行性

Spring AMQP single consumer parallelism with prefetch

我们有一个项目正在使用 Spring-AMQP 来使用来自我们的 RabbitMQ 代理的消息。我们想增加消费端的并发度,以便多个工作线程可以并行处理消息。我首先阅读了本机 RabbitMQ 客户端的文档,这引导我设计使用单个消费者,并且预取计数 > 1 以控制并行性。直接使用 RabbitMQ 客户端,这看起来很自然。 DefaultConsumerhandleDelivery 方法可以产生一个新的 Runnable 来完成工作并在工作结束时确认消息。预取计数有效地控制了消费者产生的最大 Runnable 数量。

但是,这种设计似乎无法转化为 Spring-AMQP 世界。在 SimpleMessageListenerContainer 中,对于每个 AMQP 消费者,所有消息都被传递到一个 BlockingQueueConsumer 中,并且一个线程将消息从 BlockingQueueConsumer 的阻塞队列传递到 MessageListener .尽管 SimpleMessageListenerContainer 支持 TaskExecutor,但 TaskExecutor 仅用于 运行 每个消费者一项任务。因此,要并行处理多条消息,必须使用多个消费者。

这让我想到了一些关于 Spring-AMQP 并行性的问题。首先,我的单一消费者和高预取计数的初始设计是实现与 AMQP 并行性的有效方法吗?如果是这样,为什么 Spring-AMQP 避开这种设计而支持每消费者线程设计?是否可以自定义 Spring-AMQP 以使单消费者并行成为可能?

Spring AMQP 是针对早期版本的 rabbit 客户端库设计的,每个连接只有一个线程。

The handleDelivery method of the DefaultConsumer can spawn a new Runnable that does the work and acknowledges the message at the end of the work.

除了简单地增加 concurrentConsumers 之外,这并没有给你带来更多好处 - 唯一的区别是每个线程都有一个消费者,但那里没有太多开销。

你可以做你想做的,但是,使用 ChannelAwareMessageListener 并将 acknowledgeMode 设置为 MANUAL - 然后你的听众负责确认消息。

在 2.0(明年)中,我们将有一个替代的侦听器容器,它将直接在客户端库线程上调用侦听器。但是,这是相当多的工作。 This (closed) pull request has an initial PoC 但它还不是一个功能齐全的容器。