异步生成器,以前的迭代等待未来的迭代?

Async generator, previous iterations await a future iteration?

我想生成一个可枚举的任务,这些任务将在不同的时间完成。

如何在 C# 中创建一个生成器:

我想这样做的原因是因为我正在处理很长的可迭代输入,而且我经常从这些输入中积累足够的数据来发送一个批处理 API 请求并最终确定我的输出。

伪代码:

IEnumerable<Task<Output>> Process(IEnumerable<Input> inputs)
{
    var queuedInputs = Queue<Input>();
    var cumulativeLength = 0;
    foreach (var input in inputs)
    {
        yield return waiting task for this input
        queuedInputs.Enqueue(input);
        cumulativeLength += input.Length;
        if (cumulativeLength > 10)
        {
            cumulativeLength = 0
            GetFromAPI(queue).ContinueWith((apiTask) => {
                Queue<BatchResult> batchResults = apiTask.Result;
                while (queuedInputs.Count > 0)
                {
                    batchResult = batchResults.Dequeue();
                    historicalInput = queuedInputs.Dequeue();
                    var output = MakeOutput(historicalInput, batchResult);
                    resolve earlier input's task with this output
                }
            });
        }
    }
}

假设MyGenerator()returnsList<Task<T>>,任务数量比较少(即使是几百个也差不多)那么可以用Task.WhenAny(), returns 第一个 Task 完成。然后从列表中删除 Task,处理结果,然后继续下一个:

var tasks = MyGenerator();
while (tasks.Count > 0) {
    var t = Task.WhenAny(tasks);
    tasks.Remove(t);

    var result = await t; // this won't actually wait since the task is already done
    // Do something with result
}

Stephen Toub 的一篇文章对此进行了很好的讨论,其中解释得更详细,如果您的任务列表数以千计,还提供了备选方案:Processing tasks as they complete

还有这篇文章,不过我觉得 Stephen 的写得更好:Process asynchronous tasks as they complete (C#)

一种方法是使用没有输出的TPL Dataflow library. This library offers a variety of components named "blocks" (TransformBlock, ActionBlock etc), where each block is processing its input data, and then propagates the results to the next block. The blocks are linked together so that the completion of the previous block in the pipeline triggers the completion of the next block etc, until the final block which is usually an ActionBlock<T>。这是一个例子:

var block1 = new TransformBlock<int, string>(item =>
{
    Thread.Sleep(1000); // Simulate synchronous work
    return item.ToString();
}, new()
{
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    EnsureOrdered = false
});

var block2 = new BatchBlock<string>(batchSize: 10);

var block3 = new ActionBlock<string[]>(async batch =>
{
    await Task.Delay(1000); // Simulate asynchronous work
}); // The default MaxDegreeOfParallelism is 1

block1.LinkTo(block2, new() { PropagateCompletion = true });
block2.LinkTo(block3, new() { PropagateCompletion = true });

// Provide some input in the pipeline
block1.Post(1);
block1.Post(2);
block1.Post(3);
block1.Post(4);
block1.Post(5);

block1.Complete(); // Mark the first block as completed
await block3.Completion; // Await the completion of the last block

TPL 数据流库功能强大且灵活,但在异常传播方面存在弱点。如果 block3 失败,则没有 built-in 方法指示 block1 停止工作。您可以阅读有关此问题的更多信息 here。如果您不希望您的块经常失败,这可能不是一个严重的问题。

你的解决方案的形状将由你的问题的形状决定。我有几个问题,因为您的问题域看起来很奇怪:

  1. 你所有的输入一开始就知道了吗? (同步)IEnumerable<Input> 暗示它们是。
  2. 您确定要在发送 any 查询之前等待一批输入吗?如果您按 10 个进行批处理但有 55 个输入,那么“剩余部分”呢?

假设你确实有同步输入,并且你想用余数进行批处理,你可以立即累积所有输入,对它们进行批处理,然后遍历批处理,异步提供输出:

async IAsyncEnumerable<Output> Process(IEnumerable<Input> inputs)
{
  foreach (var batchedInput in inputs.Batch(10))
  {
    var batchResults = await GetFromAPI(batchedInput);
    for (int i = 0; i != batchedInput.Count; ++i)
      yield return MakeOutput(batchedInput[i], batchResults[i]);
  }
}

public static IEnumerable<IReadOnlyList<TSource>> Batch<TSource>(this IEnumerable<TSource> source, int size)
{
  List<TSource>? batch = null;
  foreach (var item in source)
  {
    batch ??= new List<TSource>(capacity: size);
    batch.Add(item);
    if (batch.Count == size)
    {
      yield return batch;
      batch = null;
    }
  }

  if (batch?.Count > 0)
    yield return batch;
}

更新:

如果您想立即开始 API 调用,您可以将它们移出循环:

async IAsyncEnumerable<Output> Process(IEnumerable<Input> inputs)
{
  var batchedInputs = inputs.Batch(10).ToList();
  var apiCallTasks = batchedInputs.Select(GetFromAPI).ToList();
  foreach (int i = 0; i != apiCallTasks.Count; ++i)
  {
    var batchResults = await apiCallTasks[i];
    var batchedInput = batchedInputs[i];
    for (int j = 0; j != batchedInput.Count; ++j)
      yield return MakeOutput(batchedInput[j], batchResults[j]);
  }
}

使用TaskCompletionSource:

IEnumerable<Task<Output>> Process(IEnumerable<Input> inputs)
{
    var tcss = new List<TaskCompletionSource<Output>>();
    var queue = new Queue<(Input, TaskCompletionSource<Output>)>();
    var cumulativeLength = 0;
    foreach (var input in inputs)
    {
        var tcs = new TaskCompletionSource<Output>();
        queue.Enqueue((input, tcs));
        tcss.Add(tcs);
        cumulativeLength += input.Length;
        if (cumulativeLength > 10)
        {
            cumulativeLength = 0
            var queueClone = Queue<(Input, TaskCompletionSource<Input>)>(queue);
            queue.Clear();
            GetFromAPI(queueClone.Select(x => x.Item1)).ContinueWith((apiTask) => {
                Queue<BatchResult> batchResults = apiTask.Result;
                while (queueClone.Count > 0)
                {
                    var batchResult = batchResults.Dequeue();
                    var (queuedInput, queuedTcs) = queueClone.Dequeue();
                    var output = MakeOutput(queuedInput, batchResult);
                    queuedTcs.SetResult(output)
                }
            });
        }
    }
    GetFromAPI(queue.Select(x => x.Item1)).ContinueWith((apiTask) => {
        Queue<BatchResult> batchResults = apiTask.Result;
        while (queue.Count > 0)
        {
            var batchResult = batchResults.Dequeue();
            var (queuedInput, queuedTcs) = queue.Dequeue();
            var output = MakeOutput(queuedInput, batchResult);
            queuedTcs.SetResult(output)
        }
    });
    foreach (var tcs in tcss)
    {
        yield return tcs.Task;
    }
}