如何等待具有特定并发级别的 IAsyncEnumerable<Task<T>> 的结果

How to await the results of an IAsyncEnumerable<Task<T>>, with a specific level of concurrency

我有一个异步任务流,它是通过将异步 lambda 应用于项目流而生成的:

IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
    await Task.Delay(100);
    return x.ToString();
})

上面的方法AsyncEnumerable.RangeSelect是从System.Linq.Async包中提供的。

我想要的结果是结果流,表示为 IAsyncEnumerable<string>。结果必须以与原始任务相同的顺序流式传输。此外,必须限制流的枚举,以便在任何给定时间不超过指定数量的活动任务。

我想要一个 IAsyncEnumerable<Task<T>> 类型的扩展方法形式的解决方案,这样我就可以多次链接它并形成一个处理管道,在功能上与 TPL Dataflow 类似管道,但表达流利。以下是所需扩展方法的签名:

public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
    this IAsyncEnumerable<Task<TResult>> source,
    int concurrencyLevel);

也接受 CancellationToken 作为参数将是一个不错的功能。


更新: 为了完整起见,我包括了一个通过链接两次 AwaitResults 方法形成的流畅处理管道的示例。此管道以 PLINQ 块开始,只是为了证明混合 PLINQ 和 Linq.Async 是可能的。

int[] results = await Partitioner
    .Create(Enumerable.Range(1, 20), EnumerablePartitionerOptions.NoBuffering)
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(2)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(x =>
    {
        Thread.Sleep(100); // Simulate some CPU-bound operation
        return x;
    })
    .ToAsyncEnumerable()
    .Select(async x =>
    {
        await Task.Delay(300); // Simulate some I/O operation
        return x;
    })
    .AwaitResults(concurrencyLevel: 5)
    .Select(x => Task.Run(() =>
    {
        Thread.Sleep(100); // Simulate another CPU-bound operation
        return x;
    }))
    .AwaitResults(concurrencyLevel: 2)
    .ToArrayAsync();

Console.WriteLine($"Results: {String.Join(", ", results)}");

预期输出:

Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20


注意:回想起来AwaitResults方法应该命名为Merge,而concurrencyLevel参数应该命名为maxConcurrent,因为它的功能类似于生成 IAsyncEnumerable<T>Merge operator that exists in the Rx library. The System.Interactive.Async package does include an operator named Merge,但其重载的 none 对 IAsyncEnumerable<Task<T>> 源进行操作。它在 IEnumerable<IAsyncEnumerable<TSource>>IAsyncEnumerable<IAsyncEnumerable<TSource>> 源上运行。还可以添加参数 bufferCapacity,以便显式控制 awaiting/merging 操作所需的缓冲区大小。

这是我对 AwaitResults 方法的实现。它基于用作异步队列的 SemaphoreSlim for controlling the concurrency level, and on a Channel<Task<TResult>>。源 IAsyncEnumerable<Task<TResult>> 的枚举发生在一个即发即弃任务(馈线)中,它将热门任务推送到通道。它还为释放信号量的每个任务附加了一个延续。

该方法的最后一部分是 yielding 循环,其中任务从通道中一个一个地出列,然后依次等待。这样,结果的生成顺序与源流中的任务顺序相同。

此实现要求每个任务等待两次,这意味着它不能用于 IAsyncEnumerable<ValueTask<TResult>> 类型的源,因为 ValueTask can only be awaited once.

public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
    this IAsyncEnumerable<Task<TResult>> source,
    int concurrencyLevel = 1,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (concurrencyLevel < 1)
        throw new ArgumentOutOfRangeException(nameof(concurrencyLevel));

    var semaphore = new SemaphoreSlim(concurrencyLevel - 1);
    var channelCapacity = Math.Max(1000, concurrencyLevel * 10);
    var tasksChannel = Channel.CreateBounded<Task<TResult>>(channelCapacity);
    var completionCts = CancellationTokenSource.CreateLinkedTokenSource(
        cancellationToken);

    // Feeder task: fire and forget
    _ = Task.Run(async () =>
    {
        try
        {
            await foreach (var task in source
                .WithCancellation(completionCts.Token).ConfigureAwait(false))
            {
                HandleTaskCompletion(task);
                await tasksChannel.Writer.WriteAsync(task, completionCts.Token)
                    .ConfigureAwait(false);
                await semaphore.WaitAsync(completionCts.Token)
                    .ConfigureAwait(false); // Acquire before MoveNextAsync
            }
            tasksChannel.Writer.Complete();
        }
        catch (Exception ex)
        {
            tasksChannel.Writer.Complete(ex);
        }
    });

    async void HandleTaskCompletion(Task task)
    {
        try
        {
            await task.ConfigureAwait(false);
        }
        catch
        {
            // Ignore exceptions here
        }
        finally
        {
            semaphore.Release();
        }
    }

    try
    {
        while (await tasksChannel.Reader.WaitToReadAsync(cancellationToken)
            .ConfigureAwait(false))
        {
            while (tasksChannel.Reader.TryRead(out var task))
            {
                yield return await task.ConfigureAwait(false);
                cancellationToken.ThrowIfCancellationRequested();
            }
        }
    }
    finally // Happens when the caller disposes the output enumerator
    {
        completionCts.Cancel();
    }
}

一个重要的细节是围绕最终屈服循环的 try-finally 块。如果方法的调用者过早地放弃了结果流的枚举,则这是必需的。在那种情况下,源流的枚举也应该终止,并且这个终止使用 CancellationTokenSource 向后传播。没有它,feeder 任务永远不会完成,对象永远不会被垃圾收集,内存也会泄漏。

注意:取消cancellationToken可能不会立即取消整个操作。为了获得最大的响应速度,应该使用相同的 cancellationToken 来取消单个任务。