ParallelQuery.Aggregate 没有 运行 并行的可能原因
Possible reasons why ParallelQuery.Aggregate does not run in parallel
非常感谢 PLYNQ 专家的任何帮助!我会花时间查看答案,我在 math.SE.
上有更成熟的个人资料
我有一个 ParallelQuery<List<string>>
类型的对象,它有 44 个我想并行处理的列表(比如一次五个)。
我的进程有一个像
这样的签名
private ProcessResult Process(List<string> input)
处理后会return一个结果,是一对布尔值,如下。
private struct ProcessResult
{
public ProcessResult(bool initialised, bool successful)
{
ProcessInitialised = initialised;
ProcessSuccessful = successful;
}
public bool ProcessInitialised { get; }
public bool ProcessSuccessful { get; }
}
问题。 给定 IEnumerable<List<string>> processMe
,我的 PLYNQ 查询尝试实现此方法:https://msdn.microsoft.com/en-us/library/dd384151(v=vs.110).aspx。写成
processMe.AsParallel()
.Aggregate<List<string>, ConcurrentStack<ProcessResult>, ProcessResult>
(
new ConcurrentStack<ProcessResult>, //aggregator seed
(agg, input) =>
{ //updating the aggregate result
var res = Process(input);
agg.Push(res);
return agg;
},
agg =>
{ //obtain the result from the aggregator agg
ProcessResult res; // (in this case just the most recent result**)
agg.TryPop(out res);
return res;
}
);
不幸的是,它不是运行并行的,只是顺序的。 (** 请注意,此实现不会 "sense",我现在只是想让并行化工作。)
我尝试了一个稍微不同的实现, 运行 并行执行,但没有聚合。我定义了一个聚合方法(本质上是 ProcessResult
两部分的布尔 AND,即 aggregate([A1, A2], [B1, B2]) ≡ [A1 && B1, A2 && B2]).
private static ProcessResult AggregateProcessResults
(ProcessResult aggregate, ProcessResult latest)
{
bool ini = false, suc = false;
if (aggregate.ProcessInitialised && latest.ProcessInitialised)
ini = true;
if (aggregate.ProcessSuccessful && latest.ProcessSuccessful)
suc = true;
return new ProcessResult(ini, suc);
}
并使用了 PLYNQ 查询 https://msdn.microsoft.com/en-us/library/dd383667(v=vs.110).aspx
.Aggregate<List<string>, ProcessResult, ProcessResult>(
new ProcessResult(true, true),
(res, input) => Process(input),
(agg, latest) => AggregateProcessResults(agg, latest),
agg => agg
这里的问题是 AggregateProcessResults
代码从未被命中,出于某种原因——我对结果的去向一无所知...
感谢阅读,感谢您的帮助:)
您使用的 Aggregate
过载确实不会 运行 并行,按设计。您传递种子,然后传递函数,但传递函数 (agg
) 的参数是从 previous 步骤接收到的累加器。出于这个原因,它本质上是顺序的(上一步的结果输入到下一步)并且不可并行化。不确定为什么这个重载包含在 ParallelEnumerable
中,但可能是有原因的。
相反,使用另一个重载:
var result = processMe
.AsParallel()
.Aggregate
(
// seed factory. Each partition will call this to get its own seed
() => new ConcurrentStack<ProcessResult>(),
// process element and update accumulator
(agg, input) =>
{
var res = Process(input);
agg.Push(res);
return agg;
},
// combine accumulators from different partitions
(agg1, agg2) => {
agg1.PushRange(agg2.ToArray());
return agg1;
},
// reduce
agg =>
{
ProcessResult res;
agg.TryPop(out res);
return res;
}
);
非常感谢 PLYNQ 专家的任何帮助!我会花时间查看答案,我在 math.SE.
上有更成熟的个人资料我有一个 ParallelQuery<List<string>>
类型的对象,它有 44 个我想并行处理的列表(比如一次五个)。
我的进程有一个像
private ProcessResult Process(List<string> input)
处理后会return一个结果,是一对布尔值,如下。
private struct ProcessResult
{
public ProcessResult(bool initialised, bool successful)
{
ProcessInitialised = initialised;
ProcessSuccessful = successful;
}
public bool ProcessInitialised { get; }
public bool ProcessSuccessful { get; }
}
问题。 给定 IEnumerable<List<string>> processMe
,我的 PLYNQ 查询尝试实现此方法:https://msdn.microsoft.com/en-us/library/dd384151(v=vs.110).aspx。写成
processMe.AsParallel()
.Aggregate<List<string>, ConcurrentStack<ProcessResult>, ProcessResult>
(
new ConcurrentStack<ProcessResult>, //aggregator seed
(agg, input) =>
{ //updating the aggregate result
var res = Process(input);
agg.Push(res);
return agg;
},
agg =>
{ //obtain the result from the aggregator agg
ProcessResult res; // (in this case just the most recent result**)
agg.TryPop(out res);
return res;
}
);
不幸的是,它不是运行并行的,只是顺序的。 (** 请注意,此实现不会 "sense",我现在只是想让并行化工作。)
我尝试了一个稍微不同的实现, 运行 并行执行,但没有聚合。我定义了一个聚合方法(本质上是 ProcessResult
两部分的布尔 AND,即 aggregate([A1, A2], [B1, B2]) ≡ [A1 && B1, A2 && B2]).
private static ProcessResult AggregateProcessResults
(ProcessResult aggregate, ProcessResult latest)
{
bool ini = false, suc = false;
if (aggregate.ProcessInitialised && latest.ProcessInitialised)
ini = true;
if (aggregate.ProcessSuccessful && latest.ProcessSuccessful)
suc = true;
return new ProcessResult(ini, suc);
}
并使用了 PLYNQ 查询 https://msdn.microsoft.com/en-us/library/dd383667(v=vs.110).aspx
.Aggregate<List<string>, ProcessResult, ProcessResult>(
new ProcessResult(true, true),
(res, input) => Process(input),
(agg, latest) => AggregateProcessResults(agg, latest),
agg => agg
这里的问题是 AggregateProcessResults
代码从未被命中,出于某种原因——我对结果的去向一无所知...
感谢阅读,感谢您的帮助:)
您使用的 Aggregate
过载确实不会 运行 并行,按设计。您传递种子,然后传递函数,但传递函数 (agg
) 的参数是从 previous 步骤接收到的累加器。出于这个原因,它本质上是顺序的(上一步的结果输入到下一步)并且不可并行化。不确定为什么这个重载包含在 ParallelEnumerable
中,但可能是有原因的。
相反,使用另一个重载:
var result = processMe
.AsParallel()
.Aggregate
(
// seed factory. Each partition will call this to get its own seed
() => new ConcurrentStack<ProcessResult>(),
// process element and update accumulator
(agg, input) =>
{
var res = Process(input);
agg.Push(res);
return agg;
},
// combine accumulators from different partitions
(agg1, agg2) => {
agg1.PushRange(agg2.ToArray());
return agg1;
},
// reduce
agg =>
{
ProcessResult res;
agg.TryPop(out res);
return res;
}
);