运行 并行 Web 作业
Run Web Job in parallel
我们有一系列的 4 个服务总线队列,每个队列都有一个处理消息并将其传递到下一个队列的 Web 作业。虽然我们 运行 在单个内核上运行,但每个 Web 作业都是 async
并允许其他作业在查询数据库或端点时继续。
我们在 ServiceBusConfiguration
中设置了 MaxConcurrentCalls = 3
但是,现在所有消息都在最终队列中,它不会启动最终 Web 作业的多个实例来更快地处理它们,而是同步执行。我想知道如何将我的 Web 作业配置为 运行 并行执行相同的 Web 作业。
我注意到 this article 从 2014 年开始,这表明我们必须实现自己的并行处理,但最近的文章与此信息相矛盾,称它支持 OOTB。
仅适用于 Continuous WebJobs 可用于扩展多实例。
这是确定程序或脚本 运行 是在所有实例上还是仅在一个实例上。
在多个实例上 运行 的选项 不适用于免费或共享价格等级。
在您的网络作业中,您会找到 JobHostConfiguration
对象的一个实例。此对象用于配置您的 webjob 的属性。
这是一个配置:
static void Main()
{
var config = new JobHostConfiguration();
config.UseTimers();
config.Queues.MaxDequeueCount = 2;
config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(4);
config.Queues.BatchSize = 2;
var host = new JobHost(config);
host.RunAndBlock();
}
所以让我们把项目分解成几块:
config.UseTimers();
config.UserTimers();允许我们在函数中使用 定时器触发器 。
config.Queues.MaxDequeueCount = 2;
MaxDequeueCount 是您的函数在出错时尝试处理消息的次数。
config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(4);
MaxPollingInterval 是 WebJob 检查队列的最长时间。
如果这不是您想要的,您可以像我上面那样更改此设置,以便 WebJob 每 4 秒检查一次队列最大值。
config.Queues.BatchSize = 2;
BatchSize 属性 是您的 WebJob 将同时处理的项目数量。项目将异步处理.
所以如果队列中有 2 个项目,它们将被并行处理。如果将此设置为 1,那么您将创建一个同步流,因为它一次只会从队列中取出一个项目。
有关更多详细信息,您可以参考此 article 以并行学习 运行 webjob。
更新:
方法BeginReceiveBatch/EndReceiveBatch允许你从Queue(Async)中获取多个"items",然后使用AsParallel
将前面方法返回的IEnumerable转换成多个处理消息线程。
var messages = await Task.Factory.FromAsync<IEnumerable<BrokeredMessage>>(Client.BeginReceiveBatch(3, null, null), Client.EndReceiveBatch);
messages.AsParallel().WithDegreeOfParallelism(3).ForAll(item =>
{
ProcessMessage(item);
});
该代码从队列中检索 3 条消息,然后在“3 个线程”中处理(注意:不能保证它会使用 3 个线程,.NET 会分析系统资源,如果出现这种情况,它最多会使用 3 个线程必要的。)
更多细节,你可以参考这个case。
通过设置 ServiceBusConfiguration.PrefetchCount
和 ServiceBusConfiguration.MessageOptions.MaxConcurrentCalls
,我已经能够看到单个 webjob 将多条消息出列并并行处理它们。
我们有一系列的 4 个服务总线队列,每个队列都有一个处理消息并将其传递到下一个队列的 Web 作业。虽然我们 运行 在单个内核上运行,但每个 Web 作业都是 async
并允许其他作业在查询数据库或端点时继续。
我们在 ServiceBusConfiguration
MaxConcurrentCalls = 3
但是,现在所有消息都在最终队列中,它不会启动最终 Web 作业的多个实例来更快地处理它们,而是同步执行。我想知道如何将我的 Web 作业配置为 运行 并行执行相同的 Web 作业。
我注意到 this article 从 2014 年开始,这表明我们必须实现自己的并行处理,但最近的文章与此信息相矛盾,称它支持 OOTB。
仅适用于 Continuous WebJobs 可用于扩展多实例。 这是确定程序或脚本 运行 是在所有实例上还是仅在一个实例上。
在多个实例上 运行 的选项 不适用于免费或共享价格等级。
在您的网络作业中,您会找到 JobHostConfiguration
对象的一个实例。此对象用于配置您的 webjob 的属性。
这是一个配置:
static void Main()
{
var config = new JobHostConfiguration();
config.UseTimers();
config.Queues.MaxDequeueCount = 2;
config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(4);
config.Queues.BatchSize = 2;
var host = new JobHost(config);
host.RunAndBlock();
}
所以让我们把项目分解成几块:
config.UseTimers();
config.UserTimers();允许我们在函数中使用 定时器触发器 。
config.Queues.MaxDequeueCount = 2;
MaxDequeueCount 是您的函数在出错时尝试处理消息的次数。
config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(4);
MaxPollingInterval 是 WebJob 检查队列的最长时间。 如果这不是您想要的,您可以像我上面那样更改此设置,以便 WebJob 每 4 秒检查一次队列最大值。
config.Queues.BatchSize = 2;
BatchSize 属性 是您的 WebJob 将同时处理的项目数量。项目将异步处理.
所以如果队列中有 2 个项目,它们将被并行处理。如果将此设置为 1,那么您将创建一个同步流,因为它一次只会从队列中取出一个项目。
有关更多详细信息,您可以参考此 article 以并行学习 运行 webjob。
更新:
方法BeginReceiveBatch/EndReceiveBatch允许你从Queue(Async)中获取多个"items",然后使用AsParallel
将前面方法返回的IEnumerable转换成多个处理消息线程。
var messages = await Task.Factory.FromAsync<IEnumerable<BrokeredMessage>>(Client.BeginReceiveBatch(3, null, null), Client.EndReceiveBatch);
messages.AsParallel().WithDegreeOfParallelism(3).ForAll(item =>
{
ProcessMessage(item);
});
该代码从队列中检索 3 条消息,然后在“3 个线程”中处理(注意:不能保证它会使用 3 个线程,.NET 会分析系统资源,如果出现这种情况,它最多会使用 3 个线程必要的。)
更多细节,你可以参考这个case。
通过设置 ServiceBusConfiguration.PrefetchCount
和 ServiceBusConfiguration.MessageOptions.MaxConcurrentCalls
,我已经能够看到单个 webjob 将多条消息出列并并行处理它们。