如何对 IEnumerable 的所有元素执行 Polly 策略,并在遇到第一个未处理的异常时停止
How to execute a Polly policy for all elements of an IEnumerable, and stop on first unhandled exception
Polly library, for example Bulkhead
, Retry
etc, contain a method ExecuteAsync
with many overloads (18), but none of them allows to execute the policy for all elements of an IEnumerable
and gather the results. It seems that the whole library is focused on the goal of executing a single action, leaving the responsibility of managing multiple executions to the client code. I would like to fix this omission by implementing an extension method for all Polly policies (all implementations of the IAsyncPolicy
接口的策略),签名如下:
public static Task<TResult[]> ExecuteAsync<TSource, TResult>(
this IAsyncPolicy policy,
IEnumerable<TSource> source,
Func<TSource, Task<TResult>> action,
bool continueOnCapturedContext = false,
bool onErrorContinue = false)
continueOnCapturedContext
参数控制是否在捕获的同步上下文上继续,应该直接传递
原生 ExecuteAsync
方法:
Task<TResult> IAsyncPolicy.ExecuteAsync<TResult>(
Func<CancellationToken, Task<TResult>> action,
CancellationToken cancellationToken,
bool continueOnCapturedContext);
onErrorContinue
参数是这个问题最重要的方面,因为它控制策略失败时的行为。我的意图是将此扩展方法与数千个元素一起使用,如果出现任何不符合我的政策 expected/handled 的异常,我想迅速而优雅地终止整个执行。如果参数 onErrorContinue
具有默认值 false
,第一个未处理的异常应该导致所有未决操作的取消,并且整个执行应该在所有开始的操作完成后立即终止。在 onErrorContinue: true
的相反情况下,所有元素都应由策略处理。最后,所有异常都应该被传播,捆绑在 AggregateException
中,独立于 onErrorContinue
值。
如何实现这个扩展方法?
该方法的假设使用场景:
var policy = Policy
.BulkheadAsync(maxParallelization: 10, maxQueuingActions: Int32.MaxValue)
.WrapAsync(Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(retryCount: 3,
sleepDurationProvider: n => TimeSpan.FromMilliseconds(1000 * n))
);
var urls = Enumerable.Range(1, 1000).Select(n => n.ToString());
var random = new Random(0);
string[] results = await policy.ExecuteAsync(urls, async url =>
{
await Task.Delay(500); // Simulate a web request
lock (random) if (random.NextDouble() < 0.66)
throw new HttpRequestException($"Url #{url} failed");
return url;
}, onErrorContinue: false);
¹ 这在生产中应该很少发生,但在开发过程中可能会经常发生,并且可能会影响生产力。
这是我对 ExecuteAsync
方法的实现。 CancellationTokenSource
用于在发生异常时取消挂起的操作。当有更重要的异常需要传播时,Task.WhenAll
可以很好地忽略 OperationCanceledException
。最后 Task.WhenAll
任务在未被 await
ed 的情况下返回,以保留所有异常。
public static Task<TResult[]> ExecuteAsync<TSource, TResult>(
this IAsyncPolicy policy,
IEnumerable<TSource> source,
Func<TSource, Task<TResult>> action,
bool continueOnCapturedContext = false,
bool onErrorContinue = false)
{
// Arguments validation omitted
var cts = new CancellationTokenSource();
var token = !onErrorContinue ? cts.Token : default;
var tasks = source.Select(async (item) =>
{
try
{
return await policy.ExecuteAsync(async _ =>
{
return await action(item);
}, token, continueOnCapturedContext);
}
catch
{
if (!onErrorContinue) cts.Cancel();
throw;
}
}).ToArray();
var whenAll = Task.WhenAll(tasks);
_ = whenAll.ContinueWith(_ => cts.Dispose(), TaskScheduler.Default);
return whenAll;
}
模拟 Task.WhenAll
行为,在这种情况下是可取的, otherwise (with async/await). So I am happily avoiding this trouble by using a small and dirty ContinueWith
, in order to finally dispose CancellationTokenSource
。
提供了另一种处理多个异常的方法here。此解决方案传播了一个嵌套的 AggregateException
,这听起来很难看,但在实践中没关系,因为 await
异步方法无论如何都消除了一层嵌套。
Polly library, for example Bulkhead
, Retry
etc, contain a method ExecuteAsync
with many overloads (18), but none of them allows to execute the policy for all elements of an IEnumerable
and gather the results. It seems that the whole library is focused on the goal of executing a single action, leaving the responsibility of managing multiple executions to the client code. I would like to fix this omission by implementing an extension method for all Polly policies (all implementations of the IAsyncPolicy
接口的策略),签名如下:
public static Task<TResult[]> ExecuteAsync<TSource, TResult>(
this IAsyncPolicy policy,
IEnumerable<TSource> source,
Func<TSource, Task<TResult>> action,
bool continueOnCapturedContext = false,
bool onErrorContinue = false)
continueOnCapturedContext
参数控制是否在捕获的同步上下文上继续,应该直接传递
原生 ExecuteAsync
方法:
Task<TResult> IAsyncPolicy.ExecuteAsync<TResult>(
Func<CancellationToken, Task<TResult>> action,
CancellationToken cancellationToken,
bool continueOnCapturedContext);
onErrorContinue
参数是这个问题最重要的方面,因为它控制策略失败时的行为。我的意图是将此扩展方法与数千个元素一起使用,如果出现任何不符合我的政策 expected/handled 的异常,我想迅速而优雅地终止整个执行。如果参数 onErrorContinue
具有默认值 false
,第一个未处理的异常应该导致所有未决操作的取消,并且整个执行应该在所有开始的操作完成后立即终止。在 onErrorContinue: true
的相反情况下,所有元素都应由策略处理。最后,所有异常都应该被传播,捆绑在 AggregateException
中,独立于 onErrorContinue
值。
如何实现这个扩展方法?
该方法的假设使用场景:
var policy = Policy
.BulkheadAsync(maxParallelization: 10, maxQueuingActions: Int32.MaxValue)
.WrapAsync(Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(retryCount: 3,
sleepDurationProvider: n => TimeSpan.FromMilliseconds(1000 * n))
);
var urls = Enumerable.Range(1, 1000).Select(n => n.ToString());
var random = new Random(0);
string[] results = await policy.ExecuteAsync(urls, async url =>
{
await Task.Delay(500); // Simulate a web request
lock (random) if (random.NextDouble() < 0.66)
throw new HttpRequestException($"Url #{url} failed");
return url;
}, onErrorContinue: false);
¹ 这在生产中应该很少发生,但在开发过程中可能会经常发生,并且可能会影响生产力。
这是我对 ExecuteAsync
方法的实现。 CancellationTokenSource
用于在发生异常时取消挂起的操作。当有更重要的异常需要传播时,Task.WhenAll
可以很好地忽略 OperationCanceledException
。最后 Task.WhenAll
任务在未被 await
ed 的情况下返回,以保留所有异常。
public static Task<TResult[]> ExecuteAsync<TSource, TResult>(
this IAsyncPolicy policy,
IEnumerable<TSource> source,
Func<TSource, Task<TResult>> action,
bool continueOnCapturedContext = false,
bool onErrorContinue = false)
{
// Arguments validation omitted
var cts = new CancellationTokenSource();
var token = !onErrorContinue ? cts.Token : default;
var tasks = source.Select(async (item) =>
{
try
{
return await policy.ExecuteAsync(async _ =>
{
return await action(item);
}, token, continueOnCapturedContext);
}
catch
{
if (!onErrorContinue) cts.Cancel();
throw;
}
}).ToArray();
var whenAll = Task.WhenAll(tasks);
_ = whenAll.ContinueWith(_ => cts.Dispose(), TaskScheduler.Default);
return whenAll;
}
模拟 Task.WhenAll
行为,在这种情况下是可取的,ContinueWith
, in order to finally dispose CancellationTokenSource
。
提供了另一种处理多个异常的方法here。此解决方案传播了一个嵌套的 AggregateException
,这听起来很难看,但在实践中没关系,因为 await
异步方法无论如何都消除了一层嵌套。