如何等待 IAsyncEnumerable<> 的所有结果?
How to await all results from an IAsyncEnumerable<>?
我正在研究 C# 8.0 中的新 IAsyncEnumerable<T>
内容。假设我在某处有一些我想使用的方法:
public IAsyncEnumerable<T> SomeBlackBoxFunctionAsync<T>(...) { ... }
我知道我可以使用 await foreach...
语法。但是假设我的消费者在继续之前需要从该函数获得 all 结果。在继续之前等待所有结果的最佳语法是什么?换句话说,我希望能够做类似的事情:
// but that extension - AllResultsAsync() - doesn't exist :-/
List<T> myList = await SomeBlackBoxFunctionAsync<T>().AllResultsAsync();
正确的做法是什么?
根据@DmitryBychenko 的评论,我写了一个扩展来实现我想要的:
public static async Task<ICollection<T>> AllResultsAsync<T>(this IAsyncEnumerable<T> asyncEnumerable)
{
if (null == asyncEnumerable)
throw new ArgumentNullException(nameof(asyncEnumerable));
var list = new List<T>();
await foreach (var t in asyncEnumerable)
{
list.Add(t);
}
return list;
}
我只是有点惊讶这不是 C# 8.0 原生提供的...这似乎是一个非常明显的需求。
首先警告:根据定义,异步流可能永远不会结束并一直产生结果,直到应用程序终止。这 已经 在 SignalR 或 gRPC 中使用。轮询循环也以这种方式工作。
在异步流上使用 ToListAsync
可能会产生意想不到的后果。
像这样的运算符已经可以通过 System.Linq.Async 包获得。
可以通过 ToListAsync 使用整个流。代码*看似简单,但隐藏了一些有趣的问题:
public static ValueTask<List<TSource>> ToListAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (source is IAsyncIListProvider<TSource> listProvider)
return listProvider.ToListAsync(cancellationToken);
return Core(source, cancellationToken);
static async ValueTask<List<TSource>> Core(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
{
var list = new List<TSource>();
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
list.Add(item);
}
return list;
}
}
首先,它returns一个ValueTask
。其次,它确保观察到取消并使用 ConfigureAwait(false)
来防止死锁。最后,如果源已经提供了自己的 ToListAsync
实现,则运营商会遵从它。
作为一个选项,您可以使用 ToArrayAsync
extension method, defined in System.Linq.Async
包
public static ValueTask<TSource[]> ToArrayAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)
根据定义,它扩展了 IAsyncEnumerable
接口
我正在研究 C# 8.0 中的新 IAsyncEnumerable<T>
内容。假设我在某处有一些我想使用的方法:
public IAsyncEnumerable<T> SomeBlackBoxFunctionAsync<T>(...) { ... }
我知道我可以使用 await foreach...
语法。但是假设我的消费者在继续之前需要从该函数获得 all 结果。在继续之前等待所有结果的最佳语法是什么?换句话说,我希望能够做类似的事情:
// but that extension - AllResultsAsync() - doesn't exist :-/
List<T> myList = await SomeBlackBoxFunctionAsync<T>().AllResultsAsync();
正确的做法是什么?
根据@DmitryBychenko 的评论,我写了一个扩展来实现我想要的:
public static async Task<ICollection<T>> AllResultsAsync<T>(this IAsyncEnumerable<T> asyncEnumerable)
{
if (null == asyncEnumerable)
throw new ArgumentNullException(nameof(asyncEnumerable));
var list = new List<T>();
await foreach (var t in asyncEnumerable)
{
list.Add(t);
}
return list;
}
我只是有点惊讶这不是 C# 8.0 原生提供的...这似乎是一个非常明显的需求。
首先警告:根据定义,异步流可能永远不会结束并一直产生结果,直到应用程序终止。这 已经 在 SignalR 或 gRPC 中使用。轮询循环也以这种方式工作。
在异步流上使用 ToListAsync
可能会产生意想不到的后果。
像这样的运算符已经可以通过 System.Linq.Async 包获得。
可以通过 ToListAsync 使用整个流。代码*看似简单,但隐藏了一些有趣的问题:
public static ValueTask<List<TSource>> ToListAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (source is IAsyncIListProvider<TSource> listProvider)
return listProvider.ToListAsync(cancellationToken);
return Core(source, cancellationToken);
static async ValueTask<List<TSource>> Core(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
{
var list = new List<TSource>();
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
list.Add(item);
}
return list;
}
}
首先,它returns一个ValueTask
。其次,它确保观察到取消并使用 ConfigureAwait(false)
来防止死锁。最后,如果源已经提供了自己的 ToListAsync
实现,则运营商会遵从它。
作为一个选项,您可以使用 ToArrayAsync
extension method, defined in System.Linq.Async
包
public static ValueTask<TSource[]> ToArrayAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)
根据定义,它扩展了 IAsyncEnumerable
接口