如何使用 .NET RabbitMQ 客户端为消费者使用预取计数

How to utilize prefetch count for consumer with .NET RabbitMQ client

它在 RabbitMQ 文档中说明如下

"As a rule of thumb, sharing Channel instances between threads is something to be avoided. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads."

目前我们正在查看预取计数,建议如果您有少量消费者并且 autoack=false,那么我们应该一次消费很多消息。但是,我们发现如果消费者使用单个执行线程发回手动确认,则预取不会生效。但是,如果我们将消费者处理包装在一个任务中,我们会发现预取计数确实很重要并且会显着提高消费者性能。

请参阅以下示例,其中我们将消费者对消息的消费包装在任务对象中:

class Program
{
    public static void Main()
    {
        var factory = new ConnectionFactory()
        {
            HostName = "172.20.20.13",
            UserName = "billy",
            Password = "guest",
            Port = 5671,
            VirtualHost = "/",
            Ssl = new SslOption
            {
                Enabled = true,
                ServerName = "rabbit.blah.com",
                Version = System.Security.Authentication.SslProtocols.Tls12
            }
        };
        var connection = factory.CreateConnection();
        var channel = connection.CreateModel();
        channel.BasicQos(0, 100, false);
        channel.ExchangeDeclare(exchange: "logs", type: "fanout");
        var queueName = channel.QueueDeclare().QueueName;
        Console.WriteLine(" [*] Waiting for logs.");

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var _result = new Task(() => {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                System.Threading.Thread.Sleep(80);

                channel.BasicAck(ea.DeliveryTag, false);
            });
            _result.Start();
        };
        channel.BasicConsume(queue: "test.queue.1", autoAck: false, consumer: consumer);

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

我的问题是人们如何使用利用预取计数的 .NET rabbitmq 客户端实现消费者?您是否必须使用某种任务手动确认?安全吗

来源:https://www.rabbitmq.com/api-guide.html

When manual acknowledgements are used, it is important to consider what thread does the acknowledgement. If it's different from the thread that received the delivery (e.g. Consumer#handleDelivery delegated delivery handling to a different thread), acknowledging with the multiple parameter set to true is unsafe and will result in double-acknowledgements, and therefore a channel-level protocol exception that closes the channel. Acknowledging a single message at a time can be safe.

channel.basicAck(tag, false) 是线程安全的

consumerChannel.basicAck(tag, true) 不是。

还有

中提到的一些优点

您参考的文档是针对 Java 客户端的。您应该指的是 this document

您使用的是最新版本的 .NET 客户端 (5.1),因此您在 Received 事件处理程序中的工作不会阻塞其他处理 TCP 数据的线程和它不会阻止心跳 - 两者都很好。

首先,调用 channel.BasicQos(0, 1, false) 意味着您的消费者一次只能从 RabbitMQ 接收一条就绪消息,并且在调用 BasicAck 之前不会发送另一条消息。所以,真的没有理由在另一个线程中完成你的工作,因为无论如何你都不会收到另一条消息。

如果您增加预取值(通过实验和 运行ning 基准测试),如果您的工作花费超过几毫秒到 运行,则您将不得不在后台线程中完成您的工作.

当您在 Received 事件回调中工作时,它将阻塞用于进行该回调的线程,因为该回调不在其自己的线程上执行。所以,你可以确保你的工作很短,或者在另一个线程中完成工作。

我刚刚花了一些时间检查 .NET 客户端代码,我很确定 IModel 实例不是线程安全的。如果增加预取,您将有机会同时确认多个消息,因此我建议实施一个使用它的解决方案,并确保在创建连接的同一线程上调用 BasicAck


注意: RabbitMQ 团队监控 rabbitmq-users mailing list 并且有时只在 Whosebug 上回答问题。