如何等待 IAsyncEnumerable<> 的所有结果?

How to await all results from an IAsyncEnumerable<>?

我正在研究 C# 8.0 中的新 IAsyncEnumerable<T> 内容。假设我在某处有一些我想使用的方法:

public IAsyncEnumerable<T> SomeBlackBoxFunctionAsync<T>(...) { ... }

我知道我可以使用 await foreach... 语法。但是假设我的消费者在继续之前需要从该函数获得 all 结果。在继续之前等待所有结果的最佳语法是什么?换句话说,我希望能够做类似的事情:

// but that extension - AllResultsAsync() - doesn't exist :-/
List<T> myList = await SomeBlackBoxFunctionAsync<T>().AllResultsAsync(); 

正确的做法是什么?

根据@DmitryBychenko 的评论,我写了一个扩展来实现我想要的:

    public static async Task<ICollection<T>> AllResultsAsync<T>(this IAsyncEnumerable<T> asyncEnumerable)
    {
        if (null == asyncEnumerable)
            throw new ArgumentNullException(nameof(asyncEnumerable));  

        var list = new List<T>();
        await foreach (var t in asyncEnumerable)
        {
            list.Add(t);
        }

        return list;
    }

我只是有点惊讶这不是 C# 8.0 原生提供的...这似乎是一个非常明显的需求。

首先警告:根据定义,异步流可​​能永远不会结束并一直产生结果,直到应用程序终止。这 已经 在 SignalR 或 gRPC 中使用。轮询循环也以这种方式工作。

在异步流上使用 ToListAsync 可能会产生意想不到的后果。


像这样的运算符已经可以通过 System.Linq.Async 包获得。

可以通过 ToListAsync 使用整个流。代码*看似简单,但隐藏了一些有趣的问题:

public static ValueTask<List<TSource>> ToListAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)
{
    if (source == null)
        throw Error.ArgumentNull(nameof(source));

    if (source is IAsyncIListProvider<TSource> listProvider)
        return listProvider.ToListAsync(cancellationToken);

    return Core(source, cancellationToken);

    static async ValueTask<List<TSource>> Core(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
    {
        var list = new List<TSource>();

        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
        {
            list.Add(item);
        }

        return list;
    }
}

首先,它returns一个ValueTask。其次,它确保观察到取消并使用 ConfigureAwait(false) 来防止死锁。最后,如果源已经提供了自己的 ToListAsync 实现,则运营商会遵从它。

作为一个选项,您可以使用 ToArrayAsync extension method, defined in System.Linq.Async

public static ValueTask<TSource[]> ToArrayAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)

根据定义,它扩展了 IAsyncEnumerable 接口