有没有实现IAsyncEnumerable的类似Queue的C#class?
Is there a C# class like Queue that implements IAsyncEnumerable?
Queue
和 ConcurrentQueue
都实现了 IEnumerable
但没有实现 IAsyncEnumerable
。 NuGet 上是否有可用的标准 class 或 class 实现 IAsyncEnumerable
,这样,如果队列为空,则 MoveNextAsync
的结果在添加下一个内容之前不会完成要排队吗?
如果您使用的是 .NET Core 平台,则至少有两个内置选项:
System.Threading.Tasks.Dataflow.BufferBlock<T>
class, part of the TPL Dataflow 图书馆。它本身不实现 IAsyncEnumerable<T>
,但它公开了可等待的 OutputAvailableAsync()
方法,实现 ToAsyncEnumerable
扩展方法变得微不足道。
System.Threading.Channels.Channel<T>
class, the core component of the Channels 图书馆。它通过其公开了一个 IAsyncEnumerable<T>
实现
Reader.ReadAllAsync()
¹ 方法。
两者 类 也可用于 .NET Framework,通过安装 nuget 包(每个包不同)。
针对 BufferBlock<T>
的 IAsyncEnumerable<T>
的实现:
public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
this IReceivableSourceBlock<T> source,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
{
while (source.TryReceive(out T item))
{
yield return item;
cancellationToken.ThrowIfCancellationRequested();
}
}
await source.Completion.ConfigureAwait(false); // Propagate possible exception
}
¹(不适用于 .NET Framework,但在 similar way 中很容易实现)
Queue
和 ConcurrentQueue
都实现了 IEnumerable
但没有实现 IAsyncEnumerable
。 NuGet 上是否有可用的标准 class 或 class 实现 IAsyncEnumerable
,这样,如果队列为空,则 MoveNextAsync
的结果在添加下一个内容之前不会完成要排队吗?
如果您使用的是 .NET Core 平台,则至少有两个内置选项:
System.Threading.Tasks.Dataflow.BufferBlock<T>
class, part of the TPL Dataflow 图书馆。它本身不实现IAsyncEnumerable<T>
,但它公开了可等待的OutputAvailableAsync()
方法,实现ToAsyncEnumerable
扩展方法变得微不足道。System.Threading.Channels.Channel<T>
class, the core component of the Channels 图书馆。它通过其公开了一个IAsyncEnumerable<T>
实现Reader.ReadAllAsync()
¹ 方法。
两者 类 也可用于 .NET Framework,通过安装 nuget 包(每个包不同)。
针对 BufferBlock<T>
的 IAsyncEnumerable<T>
的实现:
public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
this IReceivableSourceBlock<T> source,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
{
while (source.TryReceive(out T item))
{
yield return item;
cancellationToken.ThrowIfCancellationRequested();
}
}
await source.Completion.ConfigureAwait(false); // Propagate possible exception
}
¹(不适用于 .NET Framework,但在 similar way 中很容易实现)