如何打破 Parallel.ForEachAsync 循环,而不是取消它?

How to break the Parallel.ForEachAsync loop, not cancel it?

在 .NET 5 中,我们有 Parallel.ForEach,您可以使用 ParallelLoopState.Break() 方法来停止处理额外的迭代。允许当前的完成处理。

但是新的 .NET 6 Parallel.ForEachAsync 没有 ParallelLoopState class 所以我们不能像 Parallel.ForEach 那样破坏它。那么有没有办法在 ForEachAsync 中执行相同的中断功能? CancellationToken 传递给 func 我认为这不是正确的方法,因为您没有尝试取消 运行 循环而是阻止其他迭代开始。

类似于此功能但对于异步版本:

int count = 0;
Parallel.ForEach(enumerateFiles, new ParallelOptions() { CancellationToken = cancellationToken},
    (file, state) =>
    {
        Interlocked.Increment(ref count);
        if (count >= MaxFilesToProcess)
        {
            state.Break();
        }
...

作为一种变通方法,我可能可以在 TSource 上使用 .Take([xx]),然后再将其传递到并行循环中,但这可能不是中断复杂条件的选项。

异步 API Parallel.ForEachAsync 不提供其同步副本的 Stop/Break 功能。

复制此功能的一种方法是将 bool 标志与 TakeWhile LINQ 运算符结合使用:

bool breakFlag = false;
await Parallel.ForEachAsync(
    source.TakeWhile(_ => !Volatile.Read(ref breakFlag)),
    async (item, ct) =>
{
    // ...
    if (condition) Volatile.Write(ref breakFlag, true);
    // ...
});

Parallel.ForEachAsync 不会像 Parallel.ForEach 那样积极缓冲源序列中的元素,因此一旦满足条件,就不会启动更多异步操作。

如果 source 是一个异步可枚举 (IAsyncEnumerable<T>),则有一个兼容的 TakeWhile operator with identical functionality in the System.Linq.Async 包。