停止 Parallel.ForEachAsync

Stop Parallel.ForEachAsync

在 C# 中,我对 停止 一个 Parallel.ForEachAsync 循环感兴趣(考虑 differences between Stop and Break);对于 Parallel.ForEach 我可以执行以下操作:

Parallel.ForEach(items, (item, state) =>
{
    if (cancellationToken.IsCancellationRequested)
    {
        state.Stop();
        return;
    }

    // some process on the item
    Process(item);
});

但是,由于我有一个进程需要异步执行,所以我转而使用Parallel.ForEachAsyncForEachAsync 没有 Stop() 方法,我可以 break 循环如下,但我想知道这是否是破坏外观的最有效方法(在换句话说,循环需要在收到取消请求时尽快停止

await Parallel.ForEachAsync(items, async (item, state) =>
{
    if (cancellationToken.IsCancellationRequested)
    {
        return;
    }

    // some async process on the item
    await ProcessAsync(item);
});

您将需要这样的东西:

await Parallel.ForEachAsync(items, async (item, state) =>
{
    await ProcessAsync(item, cancellationToken);
});


async Task ProcessAsync(string item, CancellationToken ct)
{
    while (!ct.IsCancellationRequested)
    {
        //Process
    }
}

Parallel.ForEachAsync lambda has a CancellationToken as its second parameter. This token is supplied by the API, it's not the same token that you have passed in the ParallelOptions. You can forward this token to any asynchronous method that you invoke inside the lambda. If you invoke non-cancelable methods, then the best you can do is to call the ThrowIfCancellationRequested 在 lambda 内部的关键位置:

var cts = new CancellationTokenSource();
var options = new ParallelOptions() { CancellationToken = cts.Token };

try
{
    await Parallel.ForEachAsync(items, options, async (item, ct) =>
    {
        //...
        ct.ThrowIfCancellationRequested();
        //...
        await ProcessAsync(item, ct);
        //...
        ct.ThrowIfCancellationRequested();
        //...
    });
}
catch (OperationCanceledException ex)
{
    // ...
}

在 lambda 中作为参数提供的令牌,即上例中的 ct,不仅在 ParallelOptions.CancellationToken 被取消时被取消,而且在 ProcessAsync 操作的情况下也会被取消失败了。这种机制允许更快地传播异常。并行循环不会在错误发生时立即完成,因为它遵循不允许 fire-and-forget 操作的原则。循环内部启动的所有操作都必须在整个循环成功或失败之前完成。 lambda 中的令牌可以将这种延迟降至最低。