为什么并行处理对于 C# 中的第一次调用要慢得多?

Why is a parallel-processing much slower for a first call in C#?

我正在尝试使用 C# 应用程序尽可能快地处理数字。我用一个Thread.Sleep()来模拟一个处理和随机数。我使用了 3 种不同的技术。

这是我使用的测试代码:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Test
{
    internal class Program
    {
        private static void Main()
        {
            var data = new int[500000];
            var random = new Random();

            for (int i = 0; i < 500000; i++)
            {
                data[i] = random.Next();
            }

            var partialTimes = new Dictionary<int, double>();
            var iterations = 5;

            for (int i = 1; i < iterations + 1; i++)
            {
                Console.Write($"ProcessData3 {i}\t");
                StartProcessing(data, partialTimes, ProcessData3);
                GC.Collect();
            }

            Console.WriteLine();
            Console.WriteLine("Press Enter to Exit");
            Console.ReadLine();
        }

        private static void StartProcessing(int[] data, Dictionary<int, double> partialTimes, Action<int[], Dictionary<int, double>> processData)
        {
            var stopwatch = Stopwatch.StartNew();

            try
            {
                processData?.Invoke(data, partialTimes);
                stopwatch.Stop();

                Console.WriteLine($"{stopwatch.Elapsed.ToString(@"mm\:ss\:fffffff")} total = {partialTimes.Sum(s => s.Value)} max = {partialTimes.Values.Max()}");
            }
            finally
            {
                partialTimes.Clear();
            }
        }

        private static void ProcessData1(int[] data, Dictionary<int, double> partialTimes)
        {
            Parallel.ForEach(data, number =>
            {
                var partialStopwatch = Stopwatch.StartNew();

                Thread.Sleep(1);

                partialStopwatch.Stop();

                lock (partialTimes)
                {
                    partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
                }
            });
        }

        private static void ProcessData3(int[] data, Dictionary<int, double> partialTimes)
        {
            // Partition the entire source array.
            var rangePartitioner = Partitioner.Create(0, data.Length);

            // Loop over the partitions in parallel.
            Parallel.ForEach(rangePartitioner, (range, loopState) =>
            {
                // Loop over each range element without a delegate invocation.
                for (int i = range.Item1; i < range.Item2; i++)
                {
                    var number = data[i];
                    var partialStopwatch = Stopwatch.StartNew();

                    Thread.Sleep(1);

                    partialStopwatch.Stop();

                    lock (partialTimes)
                    {
                        partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
                    }
                }
            });
        }

        private static void ProcessData2(int[] data, Dictionary<int, double> partialTimes)
        {
            var tasks = new Task[data.Count()];
            for (int i = 0; i < data.Count(); i++)
            {
                var number = data[i];

                tasks[i] = Task.Factory.StartNew(() =>
                {
                    var partialStopwatch = Stopwatch.StartNew();

                    Thread.Sleep(1);

                    partialStopwatch.Stop();

                    lock (partialTimes)
                    {
                        partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
                    }
                });
            }

            Task.WaitAll(tasks);
        }
    }
}

对于每种技术,我都会重新启动程序。我得到了这些结果,
Thread.Sleep( 1 ):

ProcessData1 1  00:56:1796688 total = 801335,282599955 max = 16,8783
ProcessData1 2  00:23:5390014 total = 816167,642100022 max = 14,5913
ProcessData1 3  00:14:7090566 total = 827589,675899998 max = 13,2617
ProcessData1 4  00:10:8929177 total = 829296,528300007 max = 15,0175
ProcessData1 5  00:10:6333310 total = 839282,123200008 max = 29,2738

ProcessData2 1  00:37:8084153 total = 824507,174200022 max = 112,071
ProcessData2 2  00:16:3762096 total = 849272,47810001  max = 77,1514
ProcessData2 3  00:12:9177717 total = 854012,353100029 max = 67,5684
ProcessData2 4  00:10:4798701 total = 857396,642899983 max = 92,9408
ProcessData2 5  00:09:2206146 total = 870966,655499989 max = 51,8945

ProcessData3 1  01:13:6814541 total = 803581,718699918 max = 25,6815
ProcessData3 2  01:07:9809277 total = 814069,532899922 max = 26,0671
ProcessData3 3  01:07:9857984 total = 814148,329399928 max = 21,3116
ProcessData3 4  01:07:4812183 total = 808042,695499966 max = 16,8601
ProcessData3 5  01:07:2954614 total = 805895,325499903 max = 23,8517

在哪里
total 是在每个 Parallel.ForEach() 函数中花费的总时间和
max是每个函数的最大时间。

为什么第一个循环这么慢?其他尝试怎么可能处理得这么快?如何在第一次尝试时实现更快的并行处理?


编辑:

所以我也尝试了 Thread.Sleep( 10 )
结果是:

ProcessData1 1  02:50:2845698 total = 5109831,95429994 max = 12,0612
ProcessData1 2  00:56:3361645 total = 5125884,05919954 max = 12,7666
ProcessData1 3  00:53:4911541 total = 5131105,15209993 max = 12,7486
ProcessData1 4  00:49:5665628 total = 5144654,75829992 max = 13,2678
ProcessData1 5  00:46:0218194 total = 5152955,19509996 max = 13,702

ProcessData2 1  01:21:7207557 total = 5121889,31579983 max = 73,8152
ProcessData2 2  00:39:6660074 total = 5175557,68889969 max = 59,369
ProcessData2 3  00:31:9036416 total = 5193819,89889973 max = 56,2895
ProcessData2 4  00:27:4616803 total = 5207168,56969977 max = 65,5495
ProcessData2 5  00:24:4270755 total = 5222567,9044998  max = 65,368

ProcessData3 1  02:44:9985645 total = 5110117,19019997 max = 11,7172
ProcessData3 2  02:25:6533128 total = 5237779,27010012 max = 26,3171
ProcessData3 3  02:22:2771259 total = 5116123,45259975 max = 12,0581
ProcessData3 4  02:22:1678911 total = 5112574,93779995 max = 11,5334
ProcessData3 5  02:21:9418178 total = 5104980,07120004 max = 11,5583

所以第一个循环仍然比其他循环花费更多的秒数..

您看到的行为完全可以通过以下事实来解释:ThreadPool class 会延迟创建新线程,直到经过一小段时间(大约 1 秒......这些年来发生了变化)。

向程序中添加检测可以提供信息。在您的示例中,一个非常有用的工具是计算线程池管理的并发线程数,确定 "high water mark" (即它最终确定的最大线程数),然后使用该数字覆盖线程池的行为。

当我这样做时,我发现在第一个方法的第一个 运行 中,您最多可以获得大约 25 个线程。但是由于线程池的默认设置是只创建与您计算机上的核心数量相等的线程(在我的例子中是八个),创建额外的线程可能需要相当长的时间。当然,在那段时间里,您获得的吞吐量明显低于其他方式(因此您会产生比达到该线程数仅 20 秒左右的延迟更大的延迟)。

在该测试的后续 运行 中,线程的最大数量逐渐增加(因为每个新的 运行 已经从线程池中的更多线程开始,从之前的 运行) 高达 53 左右。

如果您事先知道线程池需要多少线程才能高效地执行您的工作,您可以使用 SetMinThreads() 方法来增加它会根据需要立即创建的线程数在切换到节流线程创建算法之前。例如,手头有 53 个线程的高水位线,您可以将最小线程数设置为该数字(或一个不错的循环数,如 50)。

当我这样做时,你第一次测试的所有五个 运行s,以前需要 25 秒到 1 分钟(当然,较长的 运行s 更早),采取大约 19 秒即可完成。

我想强调的是,您应该非常小心地使用 SetMinThreads()。一般来说,线程池非常适合管理工作负载。您在上面呈现的场景显然只是为了举例而不现实,但它确实存在问题,即您首先在每个 Parallel.ForEach() 迭代中并没有真正做那么多工作。它似乎不太适合并发,因为花费的大部分时间都在开销上。在任何类似场景中使用 SetMinThreads() 只是掩盖了一个更隐蔽的潜在问题。

您会发现,如果您定制工作负载以更好地匹配可用资源,并最大限度地减少任务和线程之间的转换,则无需覆盖默认线程池编号即可获得良好的吞吐量。


关于此特定测试的其他一些注意事项...

请注意,如果您将程序更改为 运行 同一会话中的所有三个测试(每个 运行s),"first run is longer" 仅在第一个测试中发生。为了将来参考,您应该始终着眼于测试不同的组合和顺序来处理此类 "first time is slower" 问题,以验证它是否是受此影响的特定实现,或者您是否看到第一次测试的效果,不管哪个实现是 运行 首先。有许多实现和平台细节,包括 JIT、线程池、磁盘缓存,它们会影响 any 算法的初始 运行,您需要确保您可以快速缩小搜索范围,以了解您是在处理其中一个问题还是您自己的算法中的某些真正问题。

顺便说一句,这对你的问题来说并不重要,但我发现你选择使用 data 数组中的随机数作为你的时间字典的键很奇怪。由于随机数中的冲突,恕我直言,这些计时值变得无用。您不会每次都计算(发生碰撞时,只会存储该数字的最后一个实例),这意味着显示的 "total" 时间少于实际花费的总时间,甚至最大值也会获胜' 一定是正确的(如果真正的最大值被使用相同键的后来的值覆盖,你会错过它)。


这是我对您的第一个测试的修改版本,它显示了我添加的诊断代码和(注释掉的)设置线程池计数以产生更快、更一致的行为的语句:

private static int _threadCount1;
private static int _maxThreadCount1;

private static void ProcessData1(int[] data, Dictionary<int, double> partialTimes)
{
    const int minOverride = 50;
    int minMain, minIOCP, maxMain, maxIOCP;

    ThreadPool.GetMinThreads(out minMain, out minIOCP);
    ThreadPool.GetMaxThreads(out maxMain, out maxIOCP);

    WriteLine($"cores: {Environment.ProcessorCount}");
    WriteLine($"threads: {minMain} min, {maxMain} max");

    // Uncomment two lines below to see uniform behavior across test runs:

    //ThreadPool.SetMinThreads(minOverride, minIOCP);
    //ThreadPool.SetMaxThreads(minOverride, maxIOCP);

    _threadCount1 = _maxThreadCount1 = 0;

    Parallel.ForEach(data, number =>
    {
        int threadCount = Interlocked.Increment(ref _threadCount1);

        var partialStopwatch = Stopwatch.StartNew();

        Thread.Sleep(1);

        partialStopwatch.Stop();

        lock (partialTimes)
        {
            partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
            if (_maxThreadCount1 < threadCount)
            {
                _maxThreadCount1 = threadCount;
            }
        }

        Interlocked.Decrement(ref _threadCount1);
    });

    ThreadPool.SetMinThreads(minMain, minIOCP);
    ThreadPool.SetMaxThreads(maxMain, maxIOCP);
    WriteLine($"max thread count: {_maxThreadCount1}");
}