ParallelQuery.Aggregate 没有 运行 并行的可能原因

Possible reasons why ParallelQuery.Aggregate does not run in parallel

非常感谢 PLYNQ 专家的任何帮助!我会花时间查看答案,我在 math.SE.

上有更成熟的个人资料

我有一个 ParallelQuery<List<string>> 类型的对象,它有 44 个我想并行处理的列表(比如一次五个)。 我的进程有一个像

这样的签名
private ProcessResult Process(List<string> input)

处理后会return一个结果,是一对布尔值,如下。

    private struct ProcessResult
    {
        public ProcessResult(bool initialised, bool successful)
        {
            ProcessInitialised = initialised;
            ProcessSuccessful = successful;
        }

        public bool ProcessInitialised { get; }
        public bool ProcessSuccessful { get; }
    }

问题。 给定 IEnumerable<List<string>> processMe,我的 PLYNQ 查询尝试实现此方法:https://msdn.microsoft.com/en-us/library/dd384151(v=vs.110).aspx。写成

processMe.AsParallel()
         .Aggregate<List<string>, ConcurrentStack<ProcessResult>, ProcessResult>
             (
                 new ConcurrentStack<ProcessResult>,   //aggregator seed
                 (agg, input) =>
                 {                         //updating the aggregate result
                     var res = Process(input);
                     agg.Push(res);
                     return agg;
                 },
                 agg => 
                 {                         //obtain the result from the aggregator agg
                     ProcessResult res;    // (in this case just the most recent result**)
                     agg.TryPop(out res);
                     return res;
                 }
             );

不幸的是,它不是运行并行的,只是顺序的。 (** 请注意,此实现不会 "sense",我现在只是想让并行化工作。)


我尝试了一个稍微不同的实现, 运行 并行执行,但没有聚合。我定义了一个聚合方法(本质上是 ProcessResult 两部分的布尔 AND,即 aggregate([A1, A2], [B1, B2]) ≡ [A1 && B1, A2 && B2]).

private static ProcessResult AggregateProcessResults
        (ProcessResult aggregate, ProcessResult latest)
    {
        bool ini = false, suc = false;
        if (aggregate.ProcessInitialised && latest.ProcessInitialised)
            ini = true;
        if (aggregate.ProcessSuccessful && latest.ProcessSuccessful)
            suc = true;


        return new ProcessResult(ini, suc);
    }

并使用了 PLYNQ 查询 https://msdn.microsoft.com/en-us/library/dd383667(v=vs.110).aspx

.Aggregate<List<string>, ProcessResult, ProcessResult>(
    new ProcessResult(true, true),
    (res, input)  => Process(input),
    (agg, latest) => AggregateProcessResults(agg, latest),
    agg           => agg

这里的问题是 AggregateProcessResults 代码从未被命中,出于某种原因——我对结果的去向一无所知...

感谢阅读,感谢您的帮助:)

您使用的 Aggregate 过载确实不会 运行 并行,按设计。您传递种子,然后传递函数,但传递函数 (agg) 的参数是从 previous 步骤接收到的累加器。出于这个原因,它本质上是顺序的(上一步的结果输入到下一步)并且不可并行化。不确定为什么这个重载包含在 ParallelEnumerable 中,但可能是有原因的。

相反,使用另一个重载:

var result = processMe
.AsParallel()
.Aggregate
(
    // seed factory. Each partition will call this to get its own seed
    () => new ConcurrentStack<ProcessResult>(),
    // process element and update accumulator
    (agg, input) =>
    {                                           
        var res = Process(input);
        agg.Push(res);
        return agg;
    },
    // combine accumulators from different partitions
    (agg1, agg2) => {
        agg1.PushRange(agg2.ToArray());
        return agg1;
    },
    // reduce
    agg =>
    {
        ProcessResult res;
        agg.TryPop(out res);
        return res;
    }
);