Parallel.ForEach 表现
Parallel.ForEach performance
我正在使用 Parallel.ForEach
提取一堆压缩文件并将它们复制到另一台机器上的共享文件夹,然后 BULK INSERT
进程开始。这一切都很好,但我注意到,一旦出现一些大文件,就不会启动新任务。我认为这是因为某些文件比其他文件花费的时间更长,TPL 开始缩减,并停止创建新任务。我已将 MaxDegreeOfParallelism
设置为一个合理的数字 (8)。当我查看 CPU activity 时,我可以看到,大多数时候 SQL 服务器计算机的利用率低于 30%,当它位于单个 BULK INSERT
任务。我认为它可以做更多的工作。我可以以某种方式强制 TPL 创建更多同时处理的任务吗?
原因很可能是 Parallel.ForEach
默认处理项目的方式。如果你在数组或实现 IList
的东西上使用它(以便总长度和索引器可用) - 它会分批分割整个工作负载。然后单独的线程将处理每个批次。这意味着如果批次有不同的 "size"(按大小我指的是处理它们的时间)- "small" 批次将完成得更快。
例如,让我们看这段代码:
var delays = Enumerable.Repeat(100, 24).Concat(Enumerable.Repeat(2000, 4)).ToArray();
Parallel.ForEach(delays, new ParallelOptions() {MaxDegreeOfParallelism = 4}, d =>
{
Thread.Sleep(d);
Console.WriteLine("Done with " + d);
});
如果您 运行 它,您会看到所有“100”(快速)项都被快速并行处理。然而,所有“2000”(慢)项目最终都被一个一个地处理,根本没有任何并行化。那是因为所有 "slow" 项都在同一批次中。工作负载分为 4 批 (MaxDegreeOfParallelism = 4
),前 3 批仅包含快速项目。它们完成得很快。最后一批有所有缓慢的项目,因此专用于这批的线程将一个接一个地处理它们。
您可以 "fix" 根据您的情况,通过确保项目均匀分布(这样 "slow" 项目不会在源集合中全部在一起),或者例如使用自定义分区程序:
var delays = Enumerable.Repeat(100, 24).Concat(Enumerable.Repeat(2000, 4)).ToArray();
var partitioner = Partitioner.Create(delays, EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, new ParallelOptions {MaxDegreeOfParallelism = 4}, d =>
{
Thread.Sleep(d);
Console.WriteLine("Done with " + d);
});
NoBuffering
确保一次拿走一件物品,因此避免了这个问题。
使用其他方法并行化您的工作(例如 SemaphoreSlim
或 BlockingCollection
)也是一种选择。
我正在使用 Parallel.ForEach
提取一堆压缩文件并将它们复制到另一台机器上的共享文件夹,然后 BULK INSERT
进程开始。这一切都很好,但我注意到,一旦出现一些大文件,就不会启动新任务。我认为这是因为某些文件比其他文件花费的时间更长,TPL 开始缩减,并停止创建新任务。我已将 MaxDegreeOfParallelism
设置为一个合理的数字 (8)。当我查看 CPU activity 时,我可以看到,大多数时候 SQL 服务器计算机的利用率低于 30%,当它位于单个 BULK INSERT
任务。我认为它可以做更多的工作。我可以以某种方式强制 TPL 创建更多同时处理的任务吗?
原因很可能是 Parallel.ForEach
默认处理项目的方式。如果你在数组或实现 IList
的东西上使用它(以便总长度和索引器可用) - 它会分批分割整个工作负载。然后单独的线程将处理每个批次。这意味着如果批次有不同的 "size"(按大小我指的是处理它们的时间)- "small" 批次将完成得更快。
例如,让我们看这段代码:
var delays = Enumerable.Repeat(100, 24).Concat(Enumerable.Repeat(2000, 4)).ToArray();
Parallel.ForEach(delays, new ParallelOptions() {MaxDegreeOfParallelism = 4}, d =>
{
Thread.Sleep(d);
Console.WriteLine("Done with " + d);
});
如果您 运行 它,您会看到所有“100”(快速)项都被快速并行处理。然而,所有“2000”(慢)项目最终都被一个一个地处理,根本没有任何并行化。那是因为所有 "slow" 项都在同一批次中。工作负载分为 4 批 (MaxDegreeOfParallelism = 4
),前 3 批仅包含快速项目。它们完成得很快。最后一批有所有缓慢的项目,因此专用于这批的线程将一个接一个地处理它们。
您可以 "fix" 根据您的情况,通过确保项目均匀分布(这样 "slow" 项目不会在源集合中全部在一起),或者例如使用自定义分区程序:
var delays = Enumerable.Repeat(100, 24).Concat(Enumerable.Repeat(2000, 4)).ToArray();
var partitioner = Partitioner.Create(delays, EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, new ParallelOptions {MaxDegreeOfParallelism = 4}, d =>
{
Thread.Sleep(d);
Console.WriteLine("Done with " + d);
});
NoBuffering
确保一次拿走一件物品,因此避免了这个问题。
使用其他方法并行化您的工作(例如 SemaphoreSlim
或 BlockingCollection
)也是一种选择。