RabbitMQ 异步支持

RabbitMQ asynchronous support

RabbitMQ .NET 客户端是否有任何类型的异步支持?我希望能够异步连接和使用消息,但到目前为止还没有找到方法。

(对于消费消息,我可以使用 EventingBasicConsumer,但这不是一个完整的解决方案。)

只是为了提供一些背景信息,这是我目前如何使用 RabbitMQ 的示例(代码取自我的博客):

var factory = new ConnectionFactory() { HostName = "localhost" };

using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare("testqueue", true, false, false, null);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += Consumer_Received;
        channel.BasicConsume("testqueue", true, consumer);

        Console.ReadLine();
    }
}

此时 RabbitMQ .NET 客户端没有 async/await 内置支持。 RabbitMQ .NET Client repository

上有一张公开票

Rabbit 支持使用 AsyncEventingBasicConsumer class 分派到异步消息处理程序。它的工作方式类似于 EventingBasicConsumer,但允许您注册一个 returns 和 Task 的回调。回调被分派到 RabbitMQ 客户端并等待返回的 Task

var factory = new ConnectionFactory
{
    HostName = "localhost",
    DispatchConsumersAsync = true
};

using(var connection = cf.CreateConnection())
{
    using(var channel = conn.CreateModel())
    {
        channel.QueueDeclare("testqueue", true, false, false, null);

        var consumer = new AsyncEventingBasicConsumer(model);

        consumer.Received += async (o, a) =>
        {
            Console.WriteLine("Message Get" + a.DeliveryTag);
            await Task.Yield();
        };
    }

    Console.ReadLine();
}

总结当前 async/TPL 支持:

  • 如@paul-turner 所述,现在有一个 AsyncEventingBasicConsumer 可以注册事件,return 有一个 Task.
  • 还有一个AsyncDefaultBasicConsumer for which you can override virtual methods such as HandleBasicDeliver and return a Task. Original PR here(貌似5.0也引入了?)
  • 根据对上述 PR 和 this issue 的最终评论,他们似乎正在开发一个新的 from-scratch .NET 客户端,该客户端将更全面地支持 async 操作,但我没有看到任何与该工作相关的具体链接。

AsyncEventingBasicConsumer,它所做的只是在收到消息时 await 处理您的异步“事件处理程序”。这是这里唯一异步的东西。通常你不会从中获得任何利润,因为你只有一个“处理程序”。消息仍在处理 one-by-one。它们被同步处理!你也失去了对异常处理的控制,因为等待是在消费者内部完成的。

让我猜想,异步消息处理是指某种程度的并行性。

我最终使用的是来自 TPL Dataflow 的 ActionBlockActionBlock 运行与您配置的一样多的任务,管理等待和并行性。由于它在任务而不是线程上运行,只要它们真正异步,它就可以用更少的资源进行管理。

  1. 常规 EventingBasicConsumer 调用 actionBlock.Post(something)
  2. 对于并行处理,您需要告诉 RMQ 在您 ack 它们之前向您发送 N 条消息:model.BasicQos(0, N, true);
  3. ActionBlock 有选项 MaxDegreeOfParallelism 属性 也需要设置为 N.
  4. ActionBlock 运行 async Tasks 以接收消费者之前发布的数据。任务不应抛出,因为 ActionBlock 会停止所有异常处理。
  5. 小心传递 CancellationToken 并正确等待 ActionBlock 完成所有 运行 任务:actionBlock.Complete(); await actionBlock.Completion;