RabbitMQ:如何限制消费率

RabbitMQ: how to limit consuming rate

我需要限制从 rabbitmq 队列中消费消息的速率。

我找到了很多建议,但其中大部分建议使用预取选项。但是这个选项不能满足我的需要。即使我将预取设置为 1,速率也约为 6000 messages/sec。这对消费者来说太多了。

我需要限制例如每秒 70 到 200 条消息。这意味着每 5-14 毫秒消耗一条消息。没有同步消息。

我正在使用 Node.JS 和 amqp.node 库。

我不认为 RabbitMQ 可以为您提供开箱即用的功能。

如果你只有一个消费者,那么整个事情就很简单了,你只要让它在消费消息之间休眠即可。

如果您有多个消费者,我建议您使用一些 "shared memory" 来保持费率。例如,您可能有 10 个消费者使用消息。为了在所有这些消息中保持 70-200 条消息的速率,您将调用 Redis,以查看您是否有资格处理消息。如果是,则更新 Redis,以向其他消费者显示当前正在处理一条消息。

如果您无法控制消费者,则实施选项 1 或 2 并将消息发布回 Rabbit。这样,原始消费者将以所需的速度消费消息。

我已经找到了解决办法。

我使用 npm 的模块 nanotimer 来计算延迟。

然后我计算 delay = 1 / [message_per_second] 以纳秒为单位。

然后我使用 prefetch = 1 使用消息

然后我计算真正的延迟为延迟 - [processing_message_time]

然后我在发送消息确认之前让超时=真的延迟

效果很好。感谢大家

实施令牌桶可能会有所帮助: https://en.wikipedia.org/wiki/Token_bucket

您可以编写一个生成器,以固定速率向 "token bucket queue" 生成消息并在消息上添加 TTL(可能会在一秒后过期?),或者只设置最大队列大小等于您的每秒速率.接收 "normal queue" 消息的消费者还必须接收 "token bucket queue" 消息,以便有效地处理消息以限制应用程序的速率。

NodeJS + amqplib 示例:

var queueName = 'my_token_bucket';
rabbitChannel.assertQueue(queueName, {durable: true, messageTtl: 1000, maxLength: bucket.ratePerSecond});
writeToken();

function writeToken() {
    rabbitChannel.sendToQueue(queueName, new Buffer(new Date().toISOString()), {persistent: true});
    setTimeout(writeToken, 1000 / bucket.ratePerSecond);
}

参见 RabbitMQ Documentation 中的 'Fair Dispatch'。

For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.

This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.

In order to defeat that we can use the prefetch method with the value of 1. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

这就是我用 settimeout 修复我的问题的方法

我将我的设置为每 200 毫升处理一次消耗,这将在 1 秒内消耗 5 个数据如果存在,我会进行更新

channel.consume(transactionQueueName, async (data) => {
   let dataNew = JSON.parse(data.content);
       const processedTransaction = await seperateATransaction(dataNew);
        // delay ack to avoid duplicate entry !important dont remove the settimeout
        setTimeout(function(){
          channel.ack(data);
        },200);
 });

完成