如何对 IEnumerable 的所有元素执行 Polly 策略,并在遇到第一个未处理的异常时停止

How to execute a Polly policy for all elements of an IEnumerable, and stop on first unhandled exception

Polly library, for example Bulkhead, Retry etc, contain a method ExecuteAsync with many overloads (18), but none of them allows to execute the policy for all elements of an IEnumerable and gather the results. It seems that the whole library is focused on the goal of executing a single action, leaving the responsibility of managing multiple executions to the client code. I would like to fix this omission by implementing an extension method for all Polly policies (all implementations of the IAsyncPolicy接口的策略),签名如下:

public static Task<TResult[]> ExecuteAsync<TSource, TResult>(
    this IAsyncPolicy policy,
    IEnumerable<TSource> source,
    Func<TSource, Task<TResult>> action,
    bool continueOnCapturedContext = false,
    bool onErrorContinue = false)

continueOnCapturedContext参数控制是否在捕获的同步上下文上继续,应该直接传递 原生 ExecuteAsync 方法:

Task<TResult> IAsyncPolicy.ExecuteAsync<TResult>(
    Func<CancellationToken, Task<TResult>> action,
    CancellationToken cancellationToken,
    bool continueOnCapturedContext);

onErrorContinue 参数是这个问题最重要的方面,因为它控制策略失败时的行为。我的意图是将此扩展方法与数千个元素一起使用,如果出现任何不符合我的政策 expected/handled 的异常,我想迅速而优雅地终止整个执行。如果参数 onErrorContinue 具有默认值 false,第一个未处理的异常应该导致所有未决操作的取消,并且整个执行应该在所有开始的操作完成后立即终止。在 onErrorContinue: true 的相反情况下,所有元素都应由策略处理。最后,所有异常都应该被传播,捆绑在 AggregateException 中,独立于 onErrorContinue 值。

如何实现这个扩展方法?

该方法的假设使用场景:

var policy = Policy
    .BulkheadAsync(maxParallelization: 10, maxQueuingActions: Int32.MaxValue)
    .WrapAsync(Policy
        .Handle<HttpRequestException>()
        .WaitAndRetryAsync(retryCount: 3,
            sleepDurationProvider: n => TimeSpan.FromMilliseconds(1000 * n))
    );

var urls = Enumerable.Range(1, 1000).Select(n => n.ToString());
var random = new Random(0);
string[] results = await policy.ExecuteAsync(urls, async url =>
{
    await Task.Delay(500); // Simulate a web request
    lock (random) if (random.NextDouble() < 0.66)
        throw new HttpRequestException($"Url #{url} failed");
    return url;
}, onErrorContinue: false);

¹ 这在生产中应该很少发生,但在开发过程中可能会经常发生,并且可能会影响生产力。

这是我对 ExecuteAsync 方法的实现。 CancellationTokenSource 用于在发生异常时取消挂起的操作。当有更重要的异常需要传播时,Task.WhenAll 可以很好地忽略 OperationCanceledException。最后 Task.WhenAll 任务在未被 awaited 的情况下返回,以保留所有异常。

public static Task<TResult[]> ExecuteAsync<TSource, TResult>(
    this IAsyncPolicy policy,
    IEnumerable<TSource> source,
    Func<TSource, Task<TResult>> action,
    bool continueOnCapturedContext = false,
    bool onErrorContinue = false)
{
    // Arguments validation omitted
    var cts = new CancellationTokenSource();
    var token = !onErrorContinue ? cts.Token : default;
    var tasks = source.Select(async (item) =>
    {
        try
        {
            return await policy.ExecuteAsync(async _ =>
            {
                return await action(item);
            }, token, continueOnCapturedContext);
        }
        catch
        {
            if (!onErrorContinue) cts.Cancel();
            throw;
        }
    }).ToArray();
    var whenAll = Task.WhenAll(tasks);
    _ = whenAll.ContinueWith(_ => cts.Dispose(), TaskScheduler.Default);
    return whenAll;
}

模拟 Task.WhenAll 行为,在这种情况下是可取的, otherwise (with async/await). So I am happily avoiding this trouble by using a small and dirty ContinueWith, in order to finally dispose CancellationTokenSource

提供了另一种处理多个异常的方法here。此解决方案传播了一个嵌套的 AggregateException,这听起来很难看,但在实践中没关系,因为 await 异步方法无论如何都消除了一层嵌套。