包含异步和同步的并行循环

Parallel loop containing both async and synchronous

我有一个循环需要 运行 并行,因为每次迭代都很慢并且处理器密集,但我还需要调用异步方法作为循环中每次迭代的一部分。

我看到了关于如何在循环中处理异步方法的问题,但没有看到关于异步和同步的组合的问题,这就是我所得到的。

我的(简化的)代码如下 - 我知道由于将异步操作传递给 foreach,这将无法正常工作。

protected IDictionary<int, ReportData> GetReportData()
{
    var results = new ConcurrentDictionary<int, ReportData>();
      
    Parallel.ForEach(requestData, async data =>
    {
        // process data synchronously
        var processedData = ProcessData(data);

        // get some data async
        var reportRequest = await BuildRequestAsync(processedData);

        // synchronous building
        var report = reportRequest.BuildReport();

        results.TryAdd(data.ReportId, report);
     });

     // This needs to be populated before returning
     return results;
}

当操作必须异步以等待单个异步调用时,是否有任何方法可以并行执行操作。

将同步函数转换为异步函数不是一个实用的选择。

我不想将操作分开,然后有一个 Parallel.ForEach 后跟一个 WhenAll 和另一个 Parallel.ForEach 的异步调用,因为每个阶段的速度在不同的迭代之间可能会有很大差异所以拆分它是低效的,因为较快的将等待较慢的在继续之前。

我确实想知道是否可以使用 PLINQ ForAll 代替 Parallel.ForEach,但我从未使用过 PLINQ,也不确定它是否会等待所有迭代完成后再返回,即任务是否会在过程结束时仍然 运行ning。

Is there any way to get execute the action in parallel when the action has to be async in order to await the single async call.

是的,但是您需要了解 Parallel 给您带来了什么,而当您采取其他方法时,您会失去什么。具体来说,Parallel 将自动确定合适的线程数并根据使用情况进行调整。

It's not a practical option to convert the synchronous functions to async.

对于 CPU 绑定方法,您不应该 转换它们。

I don't want to split the action up and have a Parallel.ForEach followed by the async calls with a WhenAll and another Parallel.ForEach as the speed of each stage can vary greatly between different iterations so splitting it would be inefficient as the faster ones would be waiting for the slower ones before continuing.

我要提出的第一个建议是研究 TPL Dataflow。它允许您定义各种“管道”,在限制每个阶段的并发性的同时保持数据流动。

I did wonder if a PLINQ ForAll could be used instead of the Parallel.ForEach

没有。 PLINQ 在工作方式上与 Parallel 非常相似。它们在 CPU 利用率方面存在一些差异,并且存在一些 API 差异 - 例如,如果您有一组最终结果,PLINQ 通常比 Parallel 更干净- 但从高层次的角度来看,它们非常相似。两者都只适用于同步代码。

但是,您可以像这样使用带有 Task.WhenAll 的简单 Task.Run

protected async Task<IDictionary<int, ReportData>> GetReportDataAsync()
{
  var tasks = requestData.Select(async data => Task.Run(() =>
  {
    // process data synchronously
    var processedData = ProcessData(data);

    // get some data async
    var reportRequest = await BuildRequestAsync(processedData);

    // synchronous building
    var report = reportRequest.BuildReport();

    return (Key: data.ReportId, Value: report);
  })).ToList();
  var results = await Task.WhenAll(tasks);
  return results.ToDictionary(x => x.Key, x => x.Value);
}

您可能需要应用并发限制(Parallel 会为您完成)。在异步世界中,这看起来像:

protected async Task<IDictionary<int, ReportData>> GetReportDataAsync()
{
  var throttle = new SemaphoreSlim(10);
  var tasks = requestData.Select(data => Task.Run(async () =>
  {
    await throttle.WaitAsync();
    try
    {
      // process data synchronously
      var processedData = ProcessData(data);

      // get some data async
      var reportRequest = await BuildRequestAsync(processedData);

      // synchronous building
      var report = reportRequest.BuildReport();

      return (Key: data.ReportId, Value: report);
    }
    finally
    {
      throttle.Release();
    }
  })).ToList();
  var results = await Task.WhenAll(tasks);
  return results.ToDictionary(x => x.Key, x => x.Value);
}