ParallelEnumerable.Aggregate 几种方法

ParallelEnumerable.Aggregate for several methods

开始学习多线程。有 3 种方法来计算数组的平方根的和、平均值和乘积。

首先,我使用 PLINQ 进行了三个独立的阻塞调用。然后我想,如果能够在一次调用中实现它,并且 return 一个对象同时具有求和、乘积和平均值,那就太好了。我读到 ParallelEnumerable.Aggregate 可以帮助我解决这个问题,但我完全不知道如何使用它。

如果能解释一下如何在我的案例中使用此功能,good/bad 方面的方法,我将不胜感激。

public static double Average(double[] array, string tool)
        {
            if (array == null) throw new ArgumentNullException(nameof(array));
            double sum = Enumerable.Sum(array);
            double result = sum / array.Length;
            Print(tool, result);
            return result;
        }

        public static double Sum(double[] array, string tool)
        {
            if (array == null) throw new ArgumentNullException(nameof(array));
            double sum = Enumerable.Sum(array);
            Print(tool, sum);
            return sum;
        }

        public static void ProductOfSquareRoots(double[] array, string tool)
        {
            if (array == null) throw new ArgumentNullException(nameof(array));
            double result = 1;
            foreach (var number in array)
            {
                result = result * Math.Sqrt(number);
            }
            Print(tool, result);
        }

注意:您必须使用值 1 初始化 result 变量,否则您将始终得到 0。

注2:不用Enumerable.Sum(array),直接写array.Sum().

不,Aggregate 方法不会帮助您同时计算三个函数。 参见 Martin Liversage 答案。


KISS;)

if (array == null) throw new ArgumentNullException(nameof(array));

var sum = array.Sum();
var average = array.Average();
var product = array.Aggregate(1.0, (acc, val) => acc * Math.Sqrt(val));

可以简化:

var average = sum / array.Length;

这消除了对数组的额外遍历。


想要并行化?

var sum = array.AsParallel().Sum();
//var average = array.AsParallel().Average(); // Extra pass!
var average = sum / array.Length; // More fast! Really!
var product = array.AsParallel().Aggregate(1.0, (acc, val) => acc * Math.Sqrt(val));

但是,它可能会比以前的方法慢。这种并行仅适用于非常大的 collections,包含数十亿个元素。


每次通过 collection 都需要时间。通过次数越少,性能越好。在计算平均值时,我们已经处理掉了一个。让我们只做一个。

double sum = 0;
double product = 1;

foreach (var number in array)
{
    sum += number;
    product = product * Math.Sqrt(number);
}

double average = sum / array.Length;

一关三果!我们是最棒的!


言归正传

Parallel.Invoke 方法允许您并行执行多个函数,但不会从中获取结果。适用于"fire and forget".

类型的计算

我们可以通过 运行 多个任务并行计算。在Task.WhenAll的帮助下等待他们全部完成并得到结果。

var results = await Task.WhenAll(
        Task.Run(() => array.Sum()),
        Task.Run(() => array.Average()),
        Task.Run(() => array.Aggregate(1.0, (acc, val) => acc * Math.Sqrt(val)))
    );

var sum = results[0];
var average = results[1];
var product = results[2];

对小尺寸也无效collection。但在某些情况下,它可能比 AsParallel 更有效。

用任务编写此方法的另一种方式。或许看起来更清晰。

var sumTask = Task.Run(() => array.Sum());
var avgTask = Task.Run(() => array.Average());
var prodTask = Task.Run(() => array.Aggregate(1.0, (acc, val) => acc * Math.Sqrt(val)));

Task.WaitAll(sumTask, avgTask, prodTask);

sum = sumTask.Result;
average = avgTask.Result;
product = prodTask.Result;

您要计算的三个聚合值(平均值、总和和平方根的乘积)都可以通过对数字执行单次传递来计算。您可以这样做一次并在循环内聚合三个值,而不是执行三次(每个聚合值一次)(这样可以节省时间)。

平均值是总和除以计数,因为您已经在计算总和,所以您只需要计数就可以得到平均值。如果你知道输入的大小,你甚至不必计算项目,但这里我假设输入的大小事先是未知的。

如果你想使用 LINQ,你可以使用 Aggregate:

var aggregate = numbers.Aggregate(
    // Starting value for the accumulator.
    (Count: 0, Sum: 0D, ProductOfSquareRoots: 1D),
    // Update the accumulator with a specific number.
    (accumulator, number) =>
    {
        accumulator.Count += 1;
        accumulator.Sum += number;
        accumulator.ProductOfSquareRoots *= Math.Sqrt(number);
        return accumulator;
    });

变量 aggregate 是一个包含项目 CountSumProductOfSquareRootsValueTuple<int, double, double>。在 C# 7 之前,您将使用匿名类型。但是,这将需要为输入序列中的每个值进行分配,从而减慢聚合速度。通过使用可变值元组,聚合应该变得更快。

Aggregate 与 PLINQ 一起工作,因此如果 numbersParallelQuery<T> 类型而不是 IEnumerable<T> ,那么聚合将并行执行。请注意,这要求聚合既是关联的(例如 (a + b) + c = a + (b + c) 又是可交换的(例如 a + b = b + a),这在您的情况下是正确的。

PLINQ 有开销,因此与单线程 LINQ 相比,它的性能可能不会更好,具体取决于序列中的元素数量和计算的复杂程度。您必须自己对此进行测量以确定 PLINQ 是否加快了速度。但是,您可以在 LINQ 和 PLINQ 中使用相同的 Aggregate 表达式,通过在正确的位置插入 AsParallel() 使您的代码易于从单线程切换到并行。