具有异步任务和相关 Post 异步任务的 C# ForEach 循环

C# ForEach Loop With ASync Tasks & Dependent Post ASync Tasks

我无法尝试正确构建最有效的方法来迭代从请求对象启动的多个异步任务,然后执行一些其他依赖于请求对象和第一个异步任务结果的异步任务.我是 运行 AWS 中的 C# lambda 函数。我试过这样的模型(为简洁起见省略了错误处理等):

public async Task MyAsyncWrapper()
{
  List<Task> Tasks = new List<Task>();
  foreach (var Request in Requests) 
  {
    var Continuation = this.ExecuteAsync(Request).ContinueWith(async x => {
      var KeyValuePair<bool, string> Result = x.Result;
      if (Result.Key == true)
      {
        await this.DoSomethingElseAsync(Request.Id, Request.Name, Result.Value);
        Console.WriteLine("COMPLETED");
      }
    }

    Tasks.Add(Continuation);
  }

  Task.WaitAll(Tasks.ToArray());
}

这种方法导致 DoSomethingElseAsync() 方法并没有真正得到等待,并且在我的很多 Lambda 函数调用中,我从未得到 "COMPLETED" 输出。我也用这种方法解决了这个问题:

public async Task MyAsyncWrapper()
{
  foreach (var Request in Requests) 
  {
    KeyValuePair<bool, string> Result = await this.ExecuteAsync(Request);

    if (Result.Key == true)
    {
      await this.DoSomethingElseAsync(Request.Id, Request.Name, Result.Value);
      Console.WriteLine("COMPLETED");
    }
  }
}

这可行,但我认为这是浪费,因为在等待 asnyc 完成时我只能执行循环的一次迭代。我也引用了 Interleaved Tasks 但问题是我基本上有两个循环,一个用于填充任务,另一个用于在任务完成后迭代它们,我无法访问原始 Request 对象了。所以基本上是这样的:

List<Task<KeyValuePair<bool, string>>> Tasks = new List<Task<KeyValuePair<bool, string>>>();

foreach (var Request in Requests)
{
  Tasks.Add(ths.ExecuteAsync(Request);
}

foreach (Task<KeyValuePair<bool, string>> ResultTask in Tasks.Interleaved())
{
  KeyValuePair<bool, string> Result = ResultTask.Result;
  //Can't access the original request for this method's parameters
  await this.DoSomethingElseAsync(???, ???, Result.Value);
}

关于在 foreach 循环中实现这种类型的异步链接的更好方法有什么想法吗?我理想的方法是 return 返回请求对象作为来自 ExecuteAsync() 的响应的一部分,所以我想尽可能地尝试找到其他选项。

考虑使用 TPL 数据流:

var a = new TransformBlock<Input, OutputA>(async Input i=>
{
    // do something async.
    return new OutputA();
});

var b = new TransformBlock<OutputA, OutputB>(async OutputA i =>
{
    // do more async.
    return new OutputB();
});

var c = new ActionBlock<OutputB>(async OutputB i =>
{
    // do some final async.
});

a.LinkTo(b, new DataflowLinkOptions { PropogateCompletion = true });
b.LinkTo(c, new DataflowLinkOptions { PropogateCompletion = true });

// push all of the items into the dataflow.
a.Post(new Input());
a.Complete();

// wait for it all to complete.
await c.Completion;

我可能误解了,但为什么不将你的 "iteration" 移到它自己的函数中,然后使用 Task.WhenAll 并行等待所有迭代。

public async Task MyAsyncWrapper()
{
  var allTasks = Requests.Select(ProcessRequest);

  await Task.WhenAll(allTasks);
}

private async Task ProcessRequest(Request request)
{
    KeyValuePair<bool, string> Result = await this.ExecuteAsync(request);

    if (Result.Key == true)
    {
      await this.DoSomethingElseAsync(request.Id, request.Name, Result.Value);
      Console.WriteLine("COMPLETED");
    }
}