RabbitMQ 异步支持
RabbitMQ asynchronous support
RabbitMQ .NET 客户端是否有任何类型的异步支持?我希望能够异步连接和使用消息,但到目前为止还没有找到方法。
(对于消费消息,我可以使用 EventingBasicConsumer,但这不是一个完整的解决方案。)
只是为了提供一些背景信息,这是我目前如何使用 RabbitMQ 的示例(代码取自我的博客):
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("testqueue", true, false, false, null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume("testqueue", true, consumer);
Console.ReadLine();
}
}
此时 RabbitMQ .NET 客户端没有 async/await 内置支持。 RabbitMQ .NET Client repository
上有一张公开票
Rabbit 支持使用 AsyncEventingBasicConsumer
class 分派到异步消息处理程序。它的工作方式类似于 EventingBasicConsumer
,但允许您注册一个 returns 和 Task
的回调。回调被分派到 RabbitMQ 客户端并等待返回的 Task
。
var factory = new ConnectionFactory
{
HostName = "localhost",
DispatchConsumersAsync = true
};
using(var connection = cf.CreateConnection())
{
using(var channel = conn.CreateModel())
{
channel.QueueDeclare("testqueue", true, false, false, null);
var consumer = new AsyncEventingBasicConsumer(model);
consumer.Received += async (o, a) =>
{
Console.WriteLine("Message Get" + a.DeliveryTag);
await Task.Yield();
};
}
Console.ReadLine();
}
总结当前 async
/TPL
支持:
- 如@paul-turner 所述,现在有一个
AsyncEventingBasicConsumer
可以注册事件,return 有一个 Task
.
- 还有一个
AsyncDefaultBasicConsumer
for which you can override virtual methods such as HandleBasicDeliver
and return a Task
. Original PR here(貌似5.0也引入了?)
- 根据对上述 PR 和 this issue 的最终评论,他们似乎正在开发一个新的 from-scratch .NET 客户端,该客户端将更全面地支持
async
操作,但我没有看到任何与该工作相关的具体链接。
有 AsyncEventingBasicConsumer
,它所做的只是在收到消息时 await
处理您的异步“事件处理程序”。这是这里唯一异步的东西。通常你不会从中获得任何利润,因为你只有一个“处理程序”。消息仍在处理 one-by-one。它们被同步处理!你也失去了对异常处理的控制,因为等待是在消费者内部完成的。
让我猜想,异步消息处理是指某种程度的并行性。
我最终使用的是来自 TPL Dataflow 的 ActionBlock
。 ActionBlock
运行与您配置的一样多的任务,管理等待和并行性。由于它在任务而不是线程上运行,只要它们真正异步,它就可以用更少的资源进行管理。
- 常规
EventingBasicConsumer
调用 actionBlock.Post(something)
。
- 对于并行处理,您需要告诉 RMQ 在您
ack
它们之前向您发送 N 条消息:model.BasicQos(0, N, true);
- ActionBlock 有选项
MaxDegreeOfParallelism
属性 也需要设置为 N.
- ActionBlock 运行
async Task
s 以接收消费者之前发布的数据。任务不应抛出,因为 ActionBlock 会停止所有异常处理。
- 小心传递
CancellationToken
并正确等待 ActionBlock 完成所有 运行 任务:actionBlock.Complete(); await actionBlock.Completion;
RabbitMQ .NET 客户端是否有任何类型的异步支持?我希望能够异步连接和使用消息,但到目前为止还没有找到方法。
(对于消费消息,我可以使用 EventingBasicConsumer,但这不是一个完整的解决方案。)
只是为了提供一些背景信息,这是我目前如何使用 RabbitMQ 的示例(代码取自我的博客):
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("testqueue", true, false, false, null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume("testqueue", true, consumer);
Console.ReadLine();
}
}
此时 RabbitMQ .NET 客户端没有 async/await 内置支持。 RabbitMQ .NET Client repository
上有一张公开票Rabbit 支持使用 AsyncEventingBasicConsumer
class 分派到异步消息处理程序。它的工作方式类似于 EventingBasicConsumer
,但允许您注册一个 returns 和 Task
的回调。回调被分派到 RabbitMQ 客户端并等待返回的 Task
。
var factory = new ConnectionFactory
{
HostName = "localhost",
DispatchConsumersAsync = true
};
using(var connection = cf.CreateConnection())
{
using(var channel = conn.CreateModel())
{
channel.QueueDeclare("testqueue", true, false, false, null);
var consumer = new AsyncEventingBasicConsumer(model);
consumer.Received += async (o, a) =>
{
Console.WriteLine("Message Get" + a.DeliveryTag);
await Task.Yield();
};
}
Console.ReadLine();
}
总结当前 async
/TPL
支持:
- 如@paul-turner 所述,现在有一个
AsyncEventingBasicConsumer
可以注册事件,return 有一个Task
. - 还有一个
AsyncDefaultBasicConsumer
for which you can override virtual methods such asHandleBasicDeliver
and return aTask
. Original PR here(貌似5.0也引入了?) - 根据对上述 PR 和 this issue 的最终评论,他们似乎正在开发一个新的 from-scratch .NET 客户端,该客户端将更全面地支持
async
操作,但我没有看到任何与该工作相关的具体链接。
有 AsyncEventingBasicConsumer
,它所做的只是在收到消息时 await
处理您的异步“事件处理程序”。这是这里唯一异步的东西。通常你不会从中获得任何利润,因为你只有一个“处理程序”。消息仍在处理 one-by-one。它们被同步处理!你也失去了对异常处理的控制,因为等待是在消费者内部完成的。
让我猜想,异步消息处理是指某种程度的并行性。
我最终使用的是来自 TPL Dataflow 的 ActionBlock
。 ActionBlock
运行与您配置的一样多的任务,管理等待和并行性。由于它在任务而不是线程上运行,只要它们真正异步,它就可以用更少的资源进行管理。
- 常规
EventingBasicConsumer
调用actionBlock.Post(something)
。 - 对于并行处理,您需要告诉 RMQ 在您
ack
它们之前向您发送 N 条消息:model.BasicQos(0, N, true);
- ActionBlock 有选项
MaxDegreeOfParallelism
属性 也需要设置为 N. - ActionBlock 运行
async Task
s 以接收消费者之前发布的数据。任务不应抛出,因为 ActionBlock 会停止所有异常处理。 - 小心传递
CancellationToken
并正确等待 ActionBlock 完成所有 运行 任务:actionBlock.Complete(); await actionBlock.Completion;