在多个进程中使用来自 Azure 服务总线的消息

Consuming messages from Azure Service bus in multiple processes

我正在开发一个基于 Azure 服务总线的系统,用于通过 API 和后台服务通过主题对大量消息进行异步处理来快速实现即发即弃。在这个问题的上下文中,该主题只有一个订阅,为什么它可能是一个队列。由于其他原因,我想保留这个话题。

我最近将代码从使用 WindowsAzure.ServiceBus 包的 .NET Framework 应用程序迁移到使用 Microsoft.Azure.ServiceBus 包的 .NET Core 包。要处理大量消息,我使用 MessageReceiver class 像这样:

var connString = "...";
var subscriptionPath = EntityNameHelper.FormatSubscriptionPath("topic", "subscription");
var messageReceiver = new MessageReceiver(connString, subscriptionPath);
while (...)
{
    var messages = await messageReceiver.ReceiveAsync(10, TimeSpan.FromSeconds(5));
    ...
}

为简单起见,我隐藏了一系列细节。就像我的应用程序启动 5 个线程并在每个线程中使用相同的 messageReceiver 实例处理消息一样。

我通常有这个应用程序的多个实例 运行 分布在线程和进程中。我相信我们终于得出了我的问题的代码。迁移到 .NET Core 和新的 NuGet 包后,我注意到只有一个应用程序在同时处理消息。当打开两个控制台 windows 并在每个 window 中启动一个进程时,我可以看到 window 中的应用程序 1 开始处理。 windows 2 中的应用程序不处理任何内容。数秒后,windows 1 中的应用停止处理,window 2 中的应用开始处理。一段时间后,它切换回来。开关中没有真正的模式,但我的所有消息都已成功处理。

MessageReceiver 中是否存在某种限制,允许最大线程总数处理来自同一订阅或类似内容的消息?

我不知道 MessageReceiver 在线程数方面有任何限制。新库虽然经过优化以利用并发性 w/o 对线程(异步代码)的需求。所以从技术上讲,你可以 运行 使用单个线程并有多个并发接收任务。另一种方法是使用 QueueClientSubscriptionClient 提供的消息处理程序,它们允许指定并发性以轻松处理多条消息,但它们允许每个并发回调接收一条消息(无批处理)。

代理在对第一个竞争消费者的单次调用中提供尽可能多的消息。如果没有足够的消息,所有消息将被提供给单个(或前几个)消费者。没有循环和公平分配。它确实按预期工作。