如何等待具有特定并发级别的 IAsyncEnumerable<Task<T>> 的结果
How to await the results of an IAsyncEnumerable<Task<T>>, with a specific level of concurrency
我有一个异步任务流,它是通过将异步 lambda 应用于项目流而生成的:
IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
await Task.Delay(100);
return x.ToString();
})
上面的方法AsyncEnumerable.Range
和Select
是从System.Linq.Async
包中提供的。
我想要的结果是结果流,表示为 IAsyncEnumerable<string>
。结果必须以与原始任务相同的顺序流式传输。此外,必须限制流的枚举,以便在任何给定时间不超过指定数量的活动任务。
我想要一个 IAsyncEnumerable<Task<T>>
类型的扩展方法形式的解决方案,这样我就可以多次链接它并形成一个处理管道,在功能上与 TPL Dataflow 类似管道,但表达流利。以下是所需扩展方法的签名:
public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
this IAsyncEnumerable<Task<TResult>> source,
int concurrencyLevel);
也接受 CancellationToken
作为参数将是一个不错的功能。
更新: 为了完整起见,我包括了一个通过链接两次 AwaitResults
方法形成的流畅处理管道的示例。此管道以 PLINQ 块开始,只是为了证明混合 PLINQ 和 Linq.Async 是可能的。
int[] results = await Partitioner
.Create(Enumerable.Range(1, 20), EnumerablePartitionerOptions.NoBuffering)
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(2)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x =>
{
Thread.Sleep(100); // Simulate some CPU-bound operation
return x;
})
.ToAsyncEnumerable()
.Select(async x =>
{
await Task.Delay(300); // Simulate some I/O operation
return x;
})
.AwaitResults(concurrencyLevel: 5)
.Select(x => Task.Run(() =>
{
Thread.Sleep(100); // Simulate another CPU-bound operation
return x;
}))
.AwaitResults(concurrencyLevel: 2)
.ToArrayAsync();
Console.WriteLine($"Results: {String.Join(", ", results)}");
预期输出:
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20
注意:回想起来AwaitResults
方法应该命名为Merge
,而concurrencyLevel
参数应该命名为maxConcurrent
,因为它的功能类似于生成 IAsyncEnumerable<T>
的 Merge
operator that exists in the Rx library. The System.Interactive.Async package does include an operator named Merge
,但其重载的 none 对 IAsyncEnumerable<Task<T>>
源进行操作。它在 IEnumerable<IAsyncEnumerable<TSource>>
和 IAsyncEnumerable<IAsyncEnumerable<TSource>>
源上运行。还可以添加参数 bufferCapacity
,以便显式控制 awaiting/merging 操作所需的缓冲区大小。
这是我对 AwaitResults
方法的实现。它基于用作异步队列的 SemaphoreSlim
for controlling the concurrency level, and on a Channel<Task<TResult>>
。源 IAsyncEnumerable<Task<TResult>>
的枚举发生在一个即发即弃任务(馈线)中,它将热门任务推送到通道。它还为释放信号量的每个任务附加了一个延续。
该方法的最后一部分是 yielding 循环,其中任务从通道中一个一个地出列,然后依次等待。这样,结果的生成顺序与源流中的任务顺序相同。
此实现要求每个任务等待两次,这意味着它不能用于 IAsyncEnumerable<ValueTask<TResult>>
类型的源,因为 ValueTask
can only be awaited once.
public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
this IAsyncEnumerable<Task<TResult>> source,
int concurrencyLevel = 1,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (concurrencyLevel < 1)
throw new ArgumentOutOfRangeException(nameof(concurrencyLevel));
var semaphore = new SemaphoreSlim(concurrencyLevel - 1);
var channelCapacity = Math.Max(1000, concurrencyLevel * 10);
var tasksChannel = Channel.CreateBounded<Task<TResult>>(channelCapacity);
var completionCts = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken);
// Feeder task: fire and forget
_ = Task.Run(async () =>
{
try
{
await foreach (var task in source
.WithCancellation(completionCts.Token).ConfigureAwait(false))
{
HandleTaskCompletion(task);
await tasksChannel.Writer.WriteAsync(task, completionCts.Token)
.ConfigureAwait(false);
await semaphore.WaitAsync(completionCts.Token)
.ConfigureAwait(false); // Acquire before MoveNextAsync
}
tasksChannel.Writer.Complete();
}
catch (Exception ex)
{
tasksChannel.Writer.Complete(ex);
}
});
async void HandleTaskCompletion(Task task)
{
try
{
await task.ConfigureAwait(false);
}
catch
{
// Ignore exceptions here
}
finally
{
semaphore.Release();
}
}
try
{
while (await tasksChannel.Reader.WaitToReadAsync(cancellationToken)
.ConfigureAwait(false))
{
while (tasksChannel.Reader.TryRead(out var task))
{
yield return await task.ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
}
}
}
finally // Happens when the caller disposes the output enumerator
{
completionCts.Cancel();
}
}
一个重要的细节是围绕最终屈服循环的 try-finally 块。如果方法的调用者过早地放弃了结果流的枚举,则这是必需的。在那种情况下,源流的枚举也应该终止,并且这个终止使用 CancellationTokenSource
向后传播。没有它,feeder 任务永远不会完成,对象永远不会被垃圾收集,内存也会泄漏。
注意:取消cancellationToken
可能不会立即取消整个操作。为了获得最大的响应速度,应该使用相同的 cancellationToken
来取消单个任务。
我有一个异步任务流,它是通过将异步 lambda 应用于项目流而生成的:
IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
await Task.Delay(100);
return x.ToString();
})
上面的方法AsyncEnumerable.Range
和Select
是从System.Linq.Async
包中提供的。
我想要的结果是结果流,表示为 IAsyncEnumerable<string>
。结果必须以与原始任务相同的顺序流式传输。此外,必须限制流的枚举,以便在任何给定时间不超过指定数量的活动任务。
我想要一个 IAsyncEnumerable<Task<T>>
类型的扩展方法形式的解决方案,这样我就可以多次链接它并形成一个处理管道,在功能上与 TPL Dataflow 类似管道,但表达流利。以下是所需扩展方法的签名:
public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
this IAsyncEnumerable<Task<TResult>> source,
int concurrencyLevel);
也接受 CancellationToken
作为参数将是一个不错的功能。
更新: 为了完整起见,我包括了一个通过链接两次 AwaitResults
方法形成的流畅处理管道的示例。此管道以 PLINQ 块开始,只是为了证明混合 PLINQ 和 Linq.Async 是可能的。
int[] results = await Partitioner
.Create(Enumerable.Range(1, 20), EnumerablePartitionerOptions.NoBuffering)
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(2)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x =>
{
Thread.Sleep(100); // Simulate some CPU-bound operation
return x;
})
.ToAsyncEnumerable()
.Select(async x =>
{
await Task.Delay(300); // Simulate some I/O operation
return x;
})
.AwaitResults(concurrencyLevel: 5)
.Select(x => Task.Run(() =>
{
Thread.Sleep(100); // Simulate another CPU-bound operation
return x;
}))
.AwaitResults(concurrencyLevel: 2)
.ToArrayAsync();
Console.WriteLine($"Results: {String.Join(", ", results)}");
预期输出:
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20
注意:回想起来AwaitResults
方法应该命名为Merge
,而concurrencyLevel
参数应该命名为maxConcurrent
,因为它的功能类似于生成 IAsyncEnumerable<T>
的 Merge
operator that exists in the Rx library. The System.Interactive.Async package does include an operator named Merge
,但其重载的 none 对 IAsyncEnumerable<Task<T>>
源进行操作。它在 IEnumerable<IAsyncEnumerable<TSource>>
和 IAsyncEnumerable<IAsyncEnumerable<TSource>>
源上运行。还可以添加参数 bufferCapacity
,以便显式控制 awaiting/merging 操作所需的缓冲区大小。
这是我对 AwaitResults
方法的实现。它基于用作异步队列的 SemaphoreSlim
for controlling the concurrency level, and on a Channel<Task<TResult>>
。源 IAsyncEnumerable<Task<TResult>>
的枚举发生在一个即发即弃任务(馈线)中,它将热门任务推送到通道。它还为释放信号量的每个任务附加了一个延续。
该方法的最后一部分是 yielding 循环,其中任务从通道中一个一个地出列,然后依次等待。这样,结果的生成顺序与源流中的任务顺序相同。
此实现要求每个任务等待两次,这意味着它不能用于 IAsyncEnumerable<ValueTask<TResult>>
类型的源,因为 ValueTask
can only be awaited once.
public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
this IAsyncEnumerable<Task<TResult>> source,
int concurrencyLevel = 1,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (concurrencyLevel < 1)
throw new ArgumentOutOfRangeException(nameof(concurrencyLevel));
var semaphore = new SemaphoreSlim(concurrencyLevel - 1);
var channelCapacity = Math.Max(1000, concurrencyLevel * 10);
var tasksChannel = Channel.CreateBounded<Task<TResult>>(channelCapacity);
var completionCts = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken);
// Feeder task: fire and forget
_ = Task.Run(async () =>
{
try
{
await foreach (var task in source
.WithCancellation(completionCts.Token).ConfigureAwait(false))
{
HandleTaskCompletion(task);
await tasksChannel.Writer.WriteAsync(task, completionCts.Token)
.ConfigureAwait(false);
await semaphore.WaitAsync(completionCts.Token)
.ConfigureAwait(false); // Acquire before MoveNextAsync
}
tasksChannel.Writer.Complete();
}
catch (Exception ex)
{
tasksChannel.Writer.Complete(ex);
}
});
async void HandleTaskCompletion(Task task)
{
try
{
await task.ConfigureAwait(false);
}
catch
{
// Ignore exceptions here
}
finally
{
semaphore.Release();
}
}
try
{
while (await tasksChannel.Reader.WaitToReadAsync(cancellationToken)
.ConfigureAwait(false))
{
while (tasksChannel.Reader.TryRead(out var task))
{
yield return await task.ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
}
}
}
finally // Happens when the caller disposes the output enumerator
{
completionCts.Cancel();
}
}
一个重要的细节是围绕最终屈服循环的 try-finally 块。如果方法的调用者过早地放弃了结果流的枚举,则这是必需的。在那种情况下,源流的枚举也应该终止,并且这个终止使用 CancellationTokenSource
向后传播。没有它,feeder 任务永远不会完成,对象永远不会被垃圾收集,内存也会泄漏。
注意:取消cancellationToken
可能不会立即取消整个操作。为了获得最大的响应速度,应该使用相同的 cancellationToken
来取消单个任务。