运行 并行 Web 作业

Run Web Job in parallel

我们有一系列的 4 个服务总线队列,每个队列都有一个处理消息并将其传递到下一个队列的 Web 作业。虽然我们 运行 在单个内核上运行,但每个 Web 作业都是 async 并允许其他作业在查询数据库或端点时继续。

我们在 ServiceBusConfiguration

中设置了 MaxConcurrentCalls = 3

但是,现在所有消息都在最终队列中,它不会启动最终 Web 作业的多个实例来更快地处理它们,而是同步执行。我想知道如何将我的 Web 作业配置为 运行 并行执行相同的 Web 作业。

我注意到 this article 从 2014 年开始,这表明我们必须实现自己的并行处理,但最近的文章与此信息相矛盾,称它支持 OOTB。

仅适用于 Continuous WebJobs 可用于扩展多实例。 这是确定程序或脚本 运行 是在所有实例上还是仅在一个实例上。

在多个实例上 运行 的选项 不适用于免费或共享价格等级

在您的网络作业中,您会找到 JobHostConfiguration 对象的一个​​实例。此对象用于配置您的 webjob 的属性。

这是一个配置:

static void Main()
{
    var config = new JobHostConfiguration();
    config.UseTimers();
    config.Queues.MaxDequeueCount = 2;
    config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(4);
    config.Queues.BatchSize = 2;
    var host = new JobHost(config);
    host.RunAndBlock();
}

所以让我们把项目分解成几块:

config.UseTimers();

config.UserTimers();允许我们在函数中使用 定时器触发器

config.Queues.MaxDequeueCount = 2;

MaxDequeueCount 是您的函数在出错时尝试处理消息的次数

config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(4);

MaxPollingInterval 是 WebJob 检查队列的最长时间。 如果这不是您想要的,您可以像我上面那样更改此设置,以便 WebJob 每 4 秒检查一次队列最大值

config.Queues.BatchSize = 2;

BatchSize 属性 是您的 WebJob 将同时处理的项目数量。项目将异步处理.

所以如果队列中有 2 个项目,它们将被并行处理。如果将此设置为 1,那么您将创建一个同步流,因为它一次只会从队列中取出一个项目。

有关更多详细信息,您可以参考此 article 以并行学习 运行 webjob。

更新:

方法BeginReceiveBatch/EndReceiveBatch允许你从Queue(Async)中获取多个"items",然后使用AsParallel将前面方法返回的IEnumerable转换成多个处理消息线程。

var messages = await Task.Factory.FromAsync<IEnumerable<BrokeredMessage>>(Client.BeginReceiveBatch(3, null, null), Client.EndReceiveBatch);

messages.AsParallel().WithDegreeOfParallelism(3).ForAll(item =>
{
    ProcessMessage(item);
});

该代码从队列中检索 3 条消息,然后在“3 个线程”中处理(注意:不能保证它会使用 3 个线程,.NET 会分析系统资源,如果出现这种情况,它最多会使用 3 个线程必要的。)

更多细节,你可以参考这个case

通过设置 ServiceBusConfiguration.PrefetchCountServiceBusConfiguration.MessageOptions.MaxConcurrentCalls,我已经能够看到单个 webjob 将多条消息出列并并行处理它们。