Parallel.ForEach 比正常的 foreach 慢

Parallel.ForEach slower than normal foreach

我正在 C# 控制台应用程序中使用 Parallel.ForEach,但似乎无法正确处理。我正在创建一个包含随机数的数组,我有一个连续的 foreach 和一个 Parallel.ForEach 来找到数组中的最大值。使用 C++ 中大致相同的代码,我开始看到在数组中使用 3M 值的多个线程的权衡。但是 Parallel.ForEach 即使在 100M 值时也慢两倍。我做错了什么?

class Program
{
    static void Main(string[] args)
    {
        dostuff();

    }

    static void dostuff() {
        Console.WriteLine("How large do you want the array to be?");
        int size = int.Parse(Console.ReadLine());

        int[] arr = new int[size];
        Random rand = new Random();
        for (int i = 0; i < size; i++)
        {
            arr[i] = rand.Next(0, int.MaxValue);
        }

        var watchSeq = System.Diagnostics.Stopwatch.StartNew();
        var largestSeq = FindLargestSequentially(arr);
        watchSeq.Stop();
        var elapsedSeq = watchSeq.ElapsedMilliseconds;
        Console.WriteLine("Finished sequential in: " + elapsedSeq + "ms. Largest = " + largestSeq);

        var watchPar = System.Diagnostics.Stopwatch.StartNew();
        var largestPar = FindLargestParallel(arr);
        watchPar.Stop();
        var elapsedPar = watchPar.ElapsedMilliseconds;
        Console.WriteLine("Finished parallel in: " + elapsedPar + "ms Largest = " + largestPar);

        dostuff();
    }

    static int FindLargestSequentially(int[] arr) {
        int largest = arr[0];
        foreach (int i in arr) {
            if (largest < i) {
                largest = i;
            }
        }
        return largest;
    }

    static int FindLargestParallel(int[] arr) {
        int largest = arr[0];
        Parallel.ForEach<int, int>(arr, () => 0, (i, loop, subtotal) =>
        {
            if (i > subtotal)
                subtotal = i;
            return subtotal;
        },
        (finalResult) => {
            Console.WriteLine("Thread finished with result: " + finalResult);
            if (largest < finalResult) largest = finalResult;
        }
        );
        return largest;
    }
}

这里的一些想法:在并行的情况下,涉及到线程管理逻辑,它决定了它想要使用多少线程。这个线程管理逻辑大概可能 运行s 在你的主线程上。每当线程 return 具有新的最大值时,管理逻辑就会启动并确定下一个工作项(数组中要处理的下一个数字)。我很确定这需要某种锁定。无论如何,确定下一个项目甚至可能比执行比较操作本身的成本更高。

对我来说,这听起来比处理一个接一个数字的单个线程要多得多。在单线程情况下,有许多优化在起作用:没有边界检查,CPU 可以将数据加载到 CPU 内的一级缓存中,等等。不确定,这些优化中的哪一个适用对于并行情况。

请记住,在典型的台式计算机上,只有 2 到 4 个 CPU 物理内核可用,因此您永远不会拥有比实际工作更多的内核。所以如果并行处理开销是单线程操作的2-4倍以上,并行版本必然会更慢,你在观察。

您是否尝试过 运行 在 32 核计算机上执行此操作? ;-)

更好的解决方案是确定覆盖整个数组的非重叠范围(开始+停止索引),并让每个并行任务处理一个范围。这样,每个并行任务都可以在内部执行紧密的单线程循环,并且只有 return 一旦处理完整个范围。您甚至可以根据机器的逻辑核心数确定接近最佳的范围数。我还没有尝试过这个,但我很确定你会看到单线程情况下的改进。

Parallel Foreach 循环应该 运行 更慢,因为使用的算法不是并行的,并且 运行 这个算法还有很多工作要做。

在单线程中,为了找到最大值,我们可以将第一个数字作为我们的最大值并将其与数组中的所有其他数字进行比较。如果其中一个数字大于我们的第一个数字,我们交换并继续。这样我们访问数组中的每个数字一次,总共进行 N 次比较。

在上面的并行循环中,算法会产生开销,因为每个操作都包含在具有 return 值的函数调用中。因此,除了进行比较之外,在调用堆栈上添加和删除这些调用也是 运行ning 的开销。另外,由于每次调用都依赖于之前函数调用的值,所以需要依次运行

在下面的并行 For 循环中,数组被划分为由变量 threadNumber 确定的显式线程数。这将函数调用的开销限制在一个较低的数量。

请注意,对于低值,并行循环执行速度较慢。但是,对于 100M,经过的时间有所减少。

static int FindLargestParallel(int[] arr)
{
    var answers = new ConcurrentBag<int>();
    int threadNumber = 4;

    int partitionSize = arr.Length/threadNumber;
    Parallel.For(0, /* starting number */
        threadNumber+1, /* Adding 1 to threadNumber in case array.Length not evenly divisible by threadNumber */
        i =>
        {
            if (i*partitionSize < arr.Length) /* check in case # in array is divisible by # threads */
            {
                var max = arr[i*partitionSize];
                for (var x = i*partitionSize; 
                    x < (i + 1)*partitionSize && x < arr.Length;
                    ++x)
                {
                    if (arr[x] > max)
                        max = arr[x];
                }
                answers.Add(max);
            }
        });

    /* note the shortcut in finding max in the bag */    
    return answers.Max(i=>i);
}

这是拥有一个非常小的代表机构的性能后果。

我们可以使用分区实现更好的性能。在这种情况下,主体代表执行具有高数据量的工作。

static int FindLargestParallelRange(int[] arr)
{
    object locker = new object();
    int largest = arr[0];
    Parallel.ForEach(Partitioner.Create(0, arr.Length), () => arr[0], (range, loop, subtotal) =>
    {
        for (int i = range.Item1; i < range.Item2; i++)
            if (arr[i] > subtotal)
                subtotal = arr[i];
        return subtotal;
    },
    (finalResult) =>
    {
        lock (locker)
            if (largest < finalResult)
                largest = finalResult;
    });
    return largest;
}

注意同步localFinally委托。另请注意需要正确初始化 localInit:() => arr[0] 而不是 () => 0.

使用 PLINQ 进行分区:

static int FindLargestPlinqRange(int[] arr)
{
    return Partitioner.Create(0, arr.Length)
        .AsParallel()
        .Select(range =>
        {
            int largest = arr[0];
            for (int i = range.Item1; i < range.Item2; i++)
                if (arr[i] > largest)
                    largest = arr[i];
            return largest;
        })
        .Max();
}

我强烈推荐 Stephen Toub 的免费书籍 Patterns of Parallel Programming

正如其他回答者所提到的,您尝试对此处的每个项目执行的操作是如此微不足道,以至于有各种其他因素最终比您正在做的实际工作更重要。这些可能包括:

  • JIT 优化
  • CPU分支预测
  • I/O(定时器为运行时输出线程结果)
  • 调用委托的成本
  • 任务管理成本
  • 系统错误地猜测最佳线程策略
  • memory/cpu缓存
  • 内存压力
  • 环境(调试)
  • 等等

运行 每种方法一次都不足以进行测试,因为它使上述许多因素在一次迭代中比在另一次迭代中的权重更大。您应该从更强大的基准测试策略开始。

此外,您的实施实际上是不正确的。 The documentation 具体说:

The localFinally delegate is invoked once per task to perform a final action on each task’s local state. This delegate might be invoked concurrently on multiple tasks; therefore, you must synchronize access to any shared variables.

您没有同步您的最终委托,因此您的函数很容易出现竞争条件,从而产生不正确的结果。

在大多数情况下,解决这个问题的最佳方法是利用比我们聪明的人所做的工作。 In my testing,以下方法似乎是总体上最快的:

return arr.AsParallel().Max();

尝试将集合分成多个批次,运行 个批次并行,其中批次的数量对应于您的 CPU 个核心数。 我 运行 一些方程式 1K、10K 和 1M 次使用以下方法:

  1. “for”循环。
  2. 来自 System.Threading.Tasks 库的“Parallel.For”,跨越整个集。
  3. 一个“Parallel.For”跨越 4 个批次。
  4. 来自 System.Threading.Tasks 库的“Parallel.For每个”,跨越整个集合。
  5. 4 个批次中的“Parallel.For每个”。

结果: (以秒为单位)

结论:
使用“Parallel.ForEach”并行处理批处理在超过 10K 条记录的情况下具有最佳结果。我相信批处理有帮助,因为它利用了所有 CPU 个内核(本例中为 4 个),而且还最大限度地减少了与并行化相关的线程开销。

这是我的代码:

        public void ParallelSpeedTest()
    {
        var rnd = new Random(56);
        int range = 1000000;
        int numberOfCores = 4;
        int batchSize = range / numberOfCores;
        int[] rangeIndexes = Enumerable.Range(0, range).ToArray();
        double[] inputs = rangeIndexes.Select(n => rnd.NextDouble()).ToArray();
        double[] weights = rangeIndexes.Select(n => rnd.NextDouble()).ToArray();
        double[] outputs = new double[rangeIndexes.Length];

        /// Series "for"...
        var startTimeSeries = DateTime.Now;
        for (var i = 0; i < range; i++)
        {
            outputs[i] = Math.Sqrt(Math.Pow(inputs[i] * weights[i], 2));
        }
        var durationSeries = DateTime.Now - startTimeSeries;

        /// "Parallel.For"...
        var startTimeParallel = DateTime.Now;
        Parallel.For(0, range, (i) => {
            outputs[i] = Math.Sqrt(Math.Pow(inputs[i] * weights[i], 2));
        });
        var durationParallelFor = DateTime.Now - startTimeParallel;

        /// "Parallel.For" in Batches...
        var startTimeParallel2 = DateTime.Now;
        Parallel.For(0, numberOfCores, (c) => {
            var endValue = (c == numberOfCores - 1) ? range : (c + 1) * batchSize;
            var startValue = c * batchSize;
            for (var i = startValue; i < endValue; i++)
            {
                outputs[i] = Math.Sqrt(Math.Pow(inputs[i] * weights[i], 2));
            }
        });
        var durationParallelForBatches = DateTime.Now - startTimeParallel2;

        /// "Parallel.ForEach"...
        var startTimeParallelForEach = DateTime.Now;
        Parallel.ForEach(rangeIndexes, (i) => {
            outputs[i] = Math.Sqrt(Math.Pow(inputs[i] * weights[i], 2));
        });
        var durationParallelForEach = DateTime.Now - startTimeParallelForEach;

        /// Parallel.ForEach in Batches...
        List<Tuple<int,int>> ranges = new List<Tuple<int, int>>();
        for (var i = 0; i < numberOfCores; i++)
        {
            int start = i * batchSize;
            int end = (i == numberOfCores - 1) ? range : (i + 1) * batchSize;
            ranges.Add(new Tuple<int,int>(start, end));
        }
        var startTimeParallelBatches = DateTime.Now;
        Parallel.ForEach(ranges, (range) => {
            for(var i = range.Item1; i < range.Item1; i++) {
                outputs[i] = Math.Sqrt(Math.Pow(inputs[i] * weights[i], 2));
            }
        });
        var durationParallelForEachBatches = DateTime.Now - startTimeParallelBatches;

        Debug.Print($"=================================================================");
        Debug.Print($"Given: Set-size: {range}, number-of-batches: {numberOfCores}, batch-size: {batchSize}");
        Debug.Print($".................................................................");
        Debug.Print($"Series For:                       {durationSeries}");
        Debug.Print($"Parallel For:                 {durationParallelFor}");
        Debug.Print($"Parallel For Batches:         {durationParallelForBatches}");
        Debug.Print($"Parallel ForEach:             {durationParallelForEach}");
        Debug.Print($"Parallel ForEach Batches:     {durationParallelForEachBatches}");
        Debug.Print($"");
    }