Parallel.For() 和 Interlocked.CompareExchange():性能较差,结果与串行版本略有不同

Parallel.For() with Interlocked.CompareExchange(): poorer performance and slightly different results to serial version

我尝试使用 Parallel.For() 计算列表的平均值。我决定反对它,因为它比简单的串行版本慢大约四倍。但令我感兴趣的是,它并没有产生与连续剧完全相同的结果,我认为了解原因会很有启发性。

我的代码是:

public static double Mean(this IList<double> list)
{
        double sum = 0.0;


        Parallel.For(0, list.Count, i => {
                        double initialSum;
                        double incrementedSum;
                        SpinWait spinWait = new SpinWait();

                        // Try incrementing the sum until the loop finds the initial sum unchanged so that it can safely replace it with the incremented one.
                        while (true) {
                            initialSum = sum;
                            incrementedSum = initialSum + list[i];
                            if (initialSum == Interlocked.CompareExchange(ref sum, incrementedSum, initialSum)) break;
                            spinWait.SpinOnce();
                        }
                     });

        return sum / list.Count;
    }

当我 运行 随机序列 2000000 点的代码时,我得到的结果在最后 2 位数字与序列平均值不同。

我搜索了 Whosebug,发现了这个:VB.NET running sum in nested loop inside Parallel.for Synclock loses information。但是,我的情况与那里描述的情况不同。线程局部变量 temp 是不准确的原因,但我使用根据教科书 Interlocked.CompareExchange() 模式更新(我希望)的单个总和。由于性能不佳(这让我感到惊讶,但我知道开销),这个问题当然没有实际意义,但我很好奇是否可以从这个案例中学到一些东西。

感谢您的想法。

使用 double 是潜在的问题,您可以通过使用 long 来更好地了解同步不是原因。你得到的结果实际上是正确的,但这永远不会让程序员高兴。

您发现浮点数学是可以交流的,但是 not associative。或者换句话说,a + b == b + aa + b + c != a + c + b。在您的代码中暗示数字的添加顺序是非常随机的。

This C++ question也讲了。

准确性问题在其他答案中得到了很好的解决,所以我不会在这里重复它,另外就是说永远不要相信浮点值的低位。相反,我将尝试解释您所看到的性能影响以及如何避免它。

由于您没有显示顺序代码,我假设最简单的情况:

double sum = list.Sum();

这是一个非常简单的操作,应该可以在一个 CPU 核心上尽可能快地运行。对于非常大的列表,似乎应该可以利用多个内核来对列表求和。而且,事实证明,您可以:

double sum = list.AsParallel().Sum();

在我的笔记本电脑上运行几次(具有 2 cores/4 逻辑过程的 i3)在针对 200 万个随机数(相同列表,多次运行)的多次运行中产生了大约 2.6 倍的加速。

然而,您的代码比上面的简单情况慢得多。不是简单地将列表分成独立求和的块,然后对结果求和,而是引入各种阻塞和等待,以便让所有线程更新单个 运行 总和。

那些额外的等待、支持它们的更复杂的代码、创建对象和为垃圾收集器添加更多工作都会导致更慢的结果。您不仅在列表中的每个项目上浪费了大量时间,而且实际上是通过让程序等待其他线程将 sum 变量单独留给您足够长的时间来强制程序执行顺序操作更新它。

假设您实际执行的操作比简单的 Sum() 可以处理的更复杂,您可能会发现 Aggregate() 方法比 Parallel.For 对您更有用。

Aggregate 扩展有多个重载,其中一个实际上是 Map Pattern implementation, with similarities to how bigdata systems like MapReduce work. Documentation is here

此版本的 Aggregate 使用累加器种子(每个线程的起始值)和三个函数:

    为序列中的每个项目调用
  1. updateAccumulatorFunc 并且 returns 更新的累加器值

  2. combineAccumulatorsFunc 用于组合并行枚举中每个分区(线程)的累加器

  3. resultSelector从累加结果中选择最终输出值

使用此方法的并行求和看起来像这样:

double sum = list.AsParallel().Aggregate(
    // seed value for accumulators
    (double)0, 
    // add val to accumulator
    (acc, val) => acc + val,
    // add accumulators
    (acc1, acc2) => acc1 + acc2,
    // just return the final accumulator
    acc => acc
);

对于工作正常的简单聚合。对于使用非平凡累加器的更复杂的聚合,有一个 variant 接受为初始状态创建累加器的函数。这在 Average 实现中很有用:

public class avg_acc
{
    public int count;
    public double sum;
}

public double ParallelAverage(IEnumerable<double> list)
{
    double avg = list.AsParallel().Aggregate(
        // accumulator factory method, called once per thread:
        () => new avg_acc { count = 0, sum = 0 },
        // update count and sum
        (acc, val) => { acc.count++; acc.sum += val; return acc; },
        // combine accumulators
        (ac1, ac2) => new avg_acc { count = ac1.count + ac2.count, sum = ac1.sum + ac2.sum },
        // calculate average
        acc => acc.sum / acc.count
    );
    return avg;
}

虽然不如标准 Average 扩展快(比顺序快约 1.5 倍,比并行慢 1.6 倍)这表明您可以如何并行执行相当复杂的操作而无需锁定输出或等待在其他线程上停止干扰它们,以及如何使用复杂的累加器来保存中间结果。