C# Parallel.ForEach 在长时间迭代时阻塞

C# Parallel.ForEach blocked on long iteration

我一直在使用 Parallel.ForEach 对项目集合进行一些耗时的处理。处理实际上是由外部命令行工具处理的,我无法更改它。但是,似乎 Parallel.ForEach 将在集合中的一个长 运行 项目上获得 "stuck"。我已经提炼出问题,可以证明 Parallel.ForEach 实际上是在等待这个漫长的过程结束,并且不允许任何其他人通过。我写了一个控制台应用程序来演示这个问题:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace testParallel
{
    class Program
    {
        static int inloop = 0;
        static int completed = 0;
        static void Main(string[] args)
        {
            // initialize an array integers to hold the wait duration (in milliseconds)
            var items = Enumerable.Repeat(10, 1000).ToArray();

            // set one of the items to 10 seconds
            items[50] = 10000;


            // Initialize our line for reporting status
            Console.Write(0.ToString("000") + " Threads, " + 0.ToString("000") + " completed");

            // Start the loop in a task (to avoid SO answers having to do with the Parallel.ForEach call, itself, not being parallel)
            var t = Task.Factory.StartNew(() => Process(items));

            // Wait for the operations to compelte
            t.Wait();

            // Report finished
            Console.WriteLine("\nDone!");
        }

        static void Process(int[] items)
        {
            // SpinWait (not sleep or yield or anything) for the specified duration
            Parallel.ForEach(items, (msToWait) =>
            {
                // increment the counter for how many threads are in the loop right now
                System.Threading.Interlocked.Increment(ref inloop);

                // determine at what time we shoule stop spinning
                var e = DateTime.Now + new TimeSpan(0, 0, 0, 0, msToWait);

                // spin until the target time
                while (DateTime.Now < e) /* no body -- just a hard loop */;

                // count another completed
                System.Threading.Interlocked.Increment(ref completed);

                // we're done with this iteration
                System.Threading.Interlocked.Decrement(ref inloop);

                // report status
                Console.Write("\r" + inloop.ToString("000") + " Threads, " + completed.ToString("000") + " completed");

            });
        }
    }
}

基本上,我制作了一个 int 数组来存储给定操作所花费的毫秒数。我将它们全部设置为 10,除了一个,我将其设置为 10000(因此,10 秒)。我在任务中启动 Parallel.ForEach 并在硬旋转等待中处理每个整数(因此它不应该屈服或休眠或任何东西)。 在每次迭代中,我都会报告现在循环体中有多少次迭代,以及我们已经完成了多少次迭代。大多数情况下,它进展顺利。然而,在接近尾声时(时间方面),它报告“001 线程,987 已完成”。

我的问题是为什么它不使用其他 7 个内核来处理剩余的 13 个"jobs"?这一 运行 长的迭代应该不会阻止它处理集合中的其他元素,对吗?

这个例子恰好是一个固定的集合,但它很容易被设置为一个可枚举的。我们不想仅仅因为需要很长时间就停止获取可枚举项中的下一项。

我找到了答案(或者至少,一个答案)。它与块分区有关。 SO 回答 here 帮我找到了。所以基本上,在我的 "Process" 函数的顶部,如果我改变这个:

        static void Process(int[] items)
        {
            Parallel.ForEach(items, (msToWait) => { ... });
        }

至此

        static void Process(int[] items)
        {
            var partitioner = Partitioner.Create(items, EnumerablePartitionerOptions.NoBuffering);
            Parallel.ForEach(partitioner, (msToWait) => { ... });
        }

它一次抓取一个工作。对于每个并行的更典型的情况,其中 body 不超过一秒,我当然可以看到分块工作集。然而,在我的用例中,每个 body 部分可能需要半秒到 5 小时不等。我当然不希望一堆 10 秒的综艺元素被一个 5 小时的元素挡住。因此,在这种情况下,"one-at-a-time" 的开销是非常值得的。