Parallel.ForEach 使用异步 lambda 等待所有迭代完成

Parallel.ForEach with async lambda waiting forall iterations to complete

最近我看到几个与 Parallel.ForEach 相关的 SO 线程与异步 lambda 混合,但所有建议的答案都是某种变通方法。

有什么办法可以这样写:

List<int> list = new List<int>[]();

Parallel.ForEach(arrayValues, async (item) =>
{
  var x = await LongRunningIoOperationAsync(item);
  list.Add(x);
});

我如何确保列表包含每次迭代中使用 lambda 执行的所有迭代中的所有项目?

通常 Parallel.ForEach 如何与异步 lambda 一起工作,如果它命中 await,它会将其线程移交给下一次迭代吗?

我假设 ParallelLoopResult IsCompleted 字段不是正确的字段,因为当所有迭代都执行时它将 return 为真,无论它们的实际 lambda 作业是否完成?

你不需要Parallel.For/ForEach在这里你只需要等待一个任务列表。

背景

简而言之,您需要非常小心 async lambdas,如果您将它们传递给 ActionFunc<Task>

您的问题是因为 Parallel.For / ForEach 不适合 async 和等待模式 IO 绑定任务 。它们适用于 cpu 绑定工作负载。这意味着它们基本上有 Action 参数,让 任务调度程序 为您创建 任务

如果您想 运行 多个 async 任务同时使用 Task.WhenAllTPL Dataflow Block(或类似的东西)可以有效地处理 CPU boundIO bound 工作负荷,或者更直接地说,他们可以处理 任务 这就是 异步方法 的含义。

除非你需要在你的 lambda 中做更多的事情(你没有展示),只需使用 SelectWhenAll

var tasks = items .Select(LongRunningIoOperationAsync);
var results = await Task.WhenAll(tasks); // here is your list of int

如果这样做,您仍然可以使用 await,

var tasks = items.Select(async (item) =>
   {
       var x = await LongRunningIoOperationAsync(item);
       // do other stuff
       return x;
   });

var results = await Task.WhenAll(tasks);

注意:如果需要Parallel.ForEach的扩展功能(即控制最大并发数的选项),有有几种方法,但是 RX 或 DataFlow 可能是最简洁的

recently I have seen several SO threads related to Parallel.ForEach mixed with async lambdas, but all proposed answers were some kind of workarounds.

嗯,那是因为 Parallel async 不起作用 。从不同的角度来看,您为什么首先要混合它们?他们做相反的事情Parallel 就是添加线程,async 就是放弃线程。如果要并发进行异步工作,则使用 Task.WhenAll。这是完成这项工作的正确工具; Parallel 不是。

话虽这么说,但您似乎想使用错误的工具,所以这就是您的操作方法...

How can I ensure that list will contain all items from all iterations executed withing lambdas in each iteration?

您需要有某种信号,某些代码可以阻止该信号直到处理完成,例如 CountdownEventMonitor。另外,您还需要保护对非线程安全 List<T> 的访问。

How will generally Parallel.ForEach work with async lambdas, if it hit await will it hand over its thread to next iteration?

由于 Parallel 不理解 async lambda,当第一个 await 向其调用者产生 (returns) 时,Parallel 将假设循环交互完成。

I assume ParallelLoopResult IsCompleted field is not proper one, as it will return true when all iterations are executed, no matter if their actual lambda jobs are finished or not?

正确。据 Parallel 所知,它只能 "see" 方法到第一个 await 那个 returns 到它的调用者。所以它不知道 async lambda 何时完成。它还将假设迭代过早完成,这会导致分区关闭。