RabbitMQ:针对具有缓慢消费者的大型队列限制快速生产者

RabbitMQ: throttling fast producer against large queues with slow consumer

我们目前正在使用 RabbitMQ,其中持续超快的生产者与受有限资源限制的消费者配对(例如,慢速 MySQL 插入)。

我们不喜欢用 x-max-length 声明队列,因为一旦达到限制,所有消息都将被丢弃或死信化,我们不想丢失消息。

添加更多消费者很容易,但他们都将受到一个共享资源的限制,因此这是行不通的。问题依然存在:如何让生产者变慢?

当然,我们可以在 Redis、memcached、MySQL 或生产者读取为 pointed out in an answer to a similar question 的其他东西中放置一个流量控制标志,或者更好的是,生产者可以定期测试队列长度并节流自身,但这些对我来说似乎是 hack。

我主要是在质疑我是否有根本性的误解。我原以为这是一种常见的情况,所以我想知道:

限制生产者的最佳做法是什么? RabbitMQ 是如何做到这一点的?还是您以完全不同的方式进行此操作?

背景

假设制作人确实知道如何通过正确的输入来放慢自己的速度。例如。硬件传感器或硬件随机数生成器,可以根据需要生成任意数量的事件。

在我们特定的真实案例中,我们有一个 API 用户可以用来添加消息。如果队列是 "full",我们希望通过让 API return 出错来应用背压,而不是吞噬和丢弃消息,因此 caller/user 知道后退,或者让 API 阻塞直到消费者赶上。我们不控制我们的用户,所以无论消费者有多快,我都可以创建一个更快的生产者。

我希望有类似 API 的 TCP 套接字,其中 write() 可以阻塞,select() 可以用来确定句柄是否可写。因此,如果队列已满,要么让 RabbitMQ API 阻塞,要么让它 return 出错。

我不认为这在任何方面都是 rabbitmq 特有的。基本上你有一个场景,其中有两个具有不同处理能力的系统,这种不匹配会带来队列溢出的风险(无论它是什么),或者甚至在生产者和消费者之间持续不匹配的情况下,只需创建事件创建和处理之间的时间间隔越来越大。

我曾经处理过这种情况,不幸的是没有灵丹妙药。您要么必须加快处理速度(更好的硬件,更适合的软件?),要么限制事件创建(这实际上与 MQ 无关)。

现在,我想问你目标是什么,事件是如何产生的。这些事件是不断产生的,没有限制或只是非常高的速率(例如来自传感器的读数 - 越多越好),或者它们是在 batches/spikes 中创建的(例如:特定时间段的用户请求,从 CRM 系统批量加载)。我假设目标是处理所有内容,因为您提到您不想丢失任何排队的消息。

如果输出是恒定的,那么一定有一些限制器(内部计数器,如果生产者是唯一的生产者,或者外部队列长度检查队列是否可以被其他系统填充)。

IF eventsInTimePeriod/timePeriod > estimatedConsumerBandwidth
THEN LowerRate()
ELSE RiseRate()

在现实世界的场景中,我们过去常常简单地手动将输出限制为估计值,并且为队列长度、从队列进入到队列离开的时间等设置了一些警报。这些限制器被省略了(大部分是错误的)后来我们经常发现一些本来应该在几个小时内完成的任务,却要等三个月才能轮到他们。

如果我们对此一无所知,恐怕很难回答"How to slow down the producer?",但一些想法是:前面提到的速率检查或者可能是阻塞的 AddMessage 方法:

AddMessage(message)
    WHILE(getQueueLength() > maxAllowedQueueLength)
        spin(1000); // or sleep or whatever
    mqAdapter.AddMessage(message)

我想说这完全取决于生产者应用程序的具体情况以及您的架构。

对于 x-max-length 属性,您说过您不希望消息被丢弃或变成死信。我看到有一个更新,为此添加了更多功能。正如我所见,它在文档中指定:

"Use the overflow setting to configure queue overflow behaviour. If overflow is set to reject-publish, the most recently published messages will be discarded. In addition, if publisher confirms are enabled, the publisher will be informed of the reject via a basic.nack message"

据我了解,您可以使用队列限制来拒绝来自发布者的新消息,从而向上游施加一些背压。