限制入站 AMQP 消息

Throttling inbound AMQP messages

我的系统是这样的:

[Q1] --> Service1 --> [Q2] --> ...(processing)... --> ServiceN --> [Outbound queue]

队列是 RabbitMQ 3.5.6。我正在使用 Spring Integration 4.2.1、Spring AMQP 1.5.1 和 Spring Integration Java DSL 1.1.0.

我想通过 Service1 限制队列 Q1 中消息的消耗,具体取决于当前正在处理的消息数量以及尚未到达 Outbound queue -例如。我想要最大。一次处理 10 条消息。那是因为处理部分比较耗资源,不想让系统超负荷。

我现在的流程初始部分配置简单如下:

IntegrationFlows
    .from(Amqp.inboundAdapter(connectionFactory, "Q1"))
    .handle(message -> service1.process(message.getPayload())
    .get();

Service1ServiceN 可以通信(它是同一个 JVM),所以我可以在它们之间实现锁定机制,以便 service1.process() 在继续执行之前阻塞已达到 "in processing" 条消息限制。这就是 - 如果我理解正确 - @Gary Russell 在 this comment 中建议。然而,它会导致消息从代理中获取并以未确认的状态挂在那里一段时间。有没有办法完全不从队列中提取消息?

@Artem Bilan 的 answer 使用 SimpleMessageListenerContainer.stop()/.start() 看起来相当重量级,看看实现和所有将被调用的 shutdown/startup 逻辑。

这两个答案现在都已有两年历史了。有更好的建议吗?

不使用消息驱动的适配器,不。代理将消息推送给消费者(根据prefetch-count)。

我不确定您为什么反对处于未确认状态的消息。

另一种方法是使用简单的轮询 <int:inbound-channel-adapter/>,轮询器中有 10 个线程,在适配器调用的 POJO 中,使用 RabbitTemplatereceive() 消息,但这比消息驱动的适配器效率低。