处理并发 http 请求时的 IHttpClient Polly 超时和 WaitAndRetry 策略说明
IHttpClient Polly Timeout and WaitAndRetry policy when handling concurrent http requests Clarification
只是想问一个关于 Pollys timeout/retry 政策的问题,以及它在处理并发 http 请求时的工作方式。
根据自己的理解,超时和重试策略将应用于每个单独的 http 请求,因此如果我们有 5 个 http 请求,每个请求都会有自己的超时和重试策略,因此从每个 http 请求下面的代码将在 5 秒后超时并重试总共 4 次。
public override void Configure(IFunctionsHostBuilder builder)
{
var timeout = Policy.TimeoutAsync<HttpResponseMessage>(TimeSpan.FromSeconds(5));
builder.Services
.AddHttpClient("PointsbetClient")
.AddPolicyHandler(GetRetryPolicy())
.AddPolicyHandler(timeout);
}
private static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.Or<TimeoutRejectedException>()
.WaitAndRetryAsync(Backoff.DecorrelatedJitterBackoffV2(
medianFirstRetryDelay: TimeSpan.FromMilliseconds(500),
retryCount: 4));
}
现在假设我有 1000 个 http 请求,我需要对其进行 GetAsync() 调用,以便我可以抓取它们的数据,出于性能目的,我使用 await Task.WhenAll(tasks);
同时进行这些调用。由于 1000 一次请求太多,我使用 SemaphoreSlim
class 并将 MaxParallelRequests 限制为 100.
现在如何应用 Polly 重试和超时策略?它是否仍适用于这 1000 个请求中的每一个,或者它将包含 100 个请求的 1 个任务视为单个 timeout/retry 策略?根据我的理解,它仍然会处理每个单独的 http 请求并将策略应用于每个单独的 http 请求,我一直在搜索但找不到关于此的确认。
简短的回答是,它们是分开处理的。
为了了解系统的工作原理,我们必须深入了解。让我们从 AddPolicyHandler
.
开始我们的旅程吧
免责声明:为了简洁起见,我稍微编辑了代码片段。
public static IHttpClientBuilder AddPolicyHandler(this IHttpClientBuilder builder, IAsyncPolicy<HttpResponseMessage> policy)
{
if (builder == null) throw new ArgumentNullException(nameof(builder));
if (policy == null) throw new ArgumentNullException(nameof(policy));
builder.AddHttpMessageHandler(() => new PolicyHttpMessageHandler(policy));
return builder;
}
此方法定义在PollyHttpClientBuilderExtensions
class 中,它为IHttpClientBuilder
.
提供扩展方法
如您所见,它只是将另一个 HttpMessageHandler
注册到链中。
现在,让我们看看这个特殊的处理程序长什么样子
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
if (request == null) throw new ArgumentNullException(nameof(request));
// Guarantee the existence of a context for every policy execution,
// but only create a new one if needed.
// This allows later handlers to flow state if desired.
var cleanUpContext = false;
var context = request.GetPolicyExecutionContext();
if (context == null)
{
context = new Context();
request.SetPolicyExecutionContext(context);
cleanUpContext = true;
}
HttpResponseMessage response;
try
{
var policy = _policy ?? SelectPolicy(request);
response = await policy.ExecuteAsync((c, ct) => SendCoreAsync(request, c, ct), context, cancellationToken).ConfigureAwait(false);
}
finally
{
if (cleanUpContext)
request.SetPolicyExecutionContext(null);
}
return response;
}
这个方法是在PolicyHttpMessageHandler
里面定义的。
如您所见,这里没有发生任何异常情况
- 我们要么检索要么创建一个新的上下文
- 我们要么从注册表中检索策略,要么使用提供的策略
- 我们执行装饰
SendCoreAsync
的策略
那么,魔法发生在哪里呢?让我们跳转这个 class
的 documentation comment
All policies provided by Polly are designed to be efficient when used in a long-lived way. Certain policies such as the
Bulkhead and Circuit-Breaker maintain state and should be scoped across calls you wish to share the Bulkhead or Circuit-Breaker state.
Take care to ensure the correct lifetimes when using policies and message handlers together in custom scenarios. The extension
methods provided by PollyHttpClientBuilderExtensions
are designed to assign a long lifetime to policies
and ensure that they can be used when the handler rotation feature is active.
要了解 Retry 与 Circuit Breaker 有何不同,让我们看看他们的 Engine
的 Implementation
签名
internal static class RetryEngine
{
internal static TResult Implementation<TResult>(
Func<Context, CancellationToken, TResult> action,
Context context,
CancellationToken cancellationToken,
ExceptionPredicates shouldRetryExceptionPredicates,
ResultPredicates<TResult> shouldRetryResultPredicates,
Action<DelegateResult<TResult>, TimeSpan, int, Context> onRetry,
int permittedRetryCount = Int32.MaxValue,
IEnumerable<TimeSpan> sleepDurationsEnumerable = null,
Func<int, DelegateResult<TResult>, Context, TimeSpan> sleepDurationProvider = null)
{
...
}
}
internal class CircuitBreakerEngine
{
internal static TResult Implementation<TResult>(
Func<Context, CancellationToken, TResult> action,
Context context,
CancellationToken cancellationToken,
ExceptionPredicates shouldHandleExceptionPredicates,
ResultPredicates<TResult> shouldHandleResultPredicates,
ICircuitController<TResult> breakerController)
{
...
}
}
这里你要注意的是ICircuitController
。不同策略执行之间的CircuitStateController
base class stores the state information. One of its derived class is shared
internal readonly ICircuitController<EmptyStruct> _breakerController;
...
CircuitBreakerEngine.Implementation(
action,
context,
cancellationToken,
ExceptionPredicates,
ResultPredicates,
_breakerController);
我希望这能澄清一些事情。
只是想问一个关于 Pollys timeout/retry 政策的问题,以及它在处理并发 http 请求时的工作方式。
根据自己的理解,超时和重试策略将应用于每个单独的 http 请求,因此如果我们有 5 个 http 请求,每个请求都会有自己的超时和重试策略,因此从每个 http 请求下面的代码将在 5 秒后超时并重试总共 4 次。
public override void Configure(IFunctionsHostBuilder builder)
{
var timeout = Policy.TimeoutAsync<HttpResponseMessage>(TimeSpan.FromSeconds(5));
builder.Services
.AddHttpClient("PointsbetClient")
.AddPolicyHandler(GetRetryPolicy())
.AddPolicyHandler(timeout);
}
private static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.Or<TimeoutRejectedException>()
.WaitAndRetryAsync(Backoff.DecorrelatedJitterBackoffV2(
medianFirstRetryDelay: TimeSpan.FromMilliseconds(500),
retryCount: 4));
}
现在假设我有 1000 个 http 请求,我需要对其进行 GetAsync() 调用,以便我可以抓取它们的数据,出于性能目的,我使用 await Task.WhenAll(tasks);
同时进行这些调用。由于 1000 一次请求太多,我使用 SemaphoreSlim
class 并将 MaxParallelRequests 限制为 100.
现在如何应用 Polly 重试和超时策略?它是否仍适用于这 1000 个请求中的每一个,或者它将包含 100 个请求的 1 个任务视为单个 timeout/retry 策略?根据我的理解,它仍然会处理每个单独的 http 请求并将策略应用于每个单独的 http 请求,我一直在搜索但找不到关于此的确认。
简短的回答是,它们是分开处理的。
为了了解系统的工作原理,我们必须深入了解。让我们从 AddPolicyHandler
.
免责声明:为了简洁起见,我稍微编辑了代码片段。
public static IHttpClientBuilder AddPolicyHandler(this IHttpClientBuilder builder, IAsyncPolicy<HttpResponseMessage> policy)
{
if (builder == null) throw new ArgumentNullException(nameof(builder));
if (policy == null) throw new ArgumentNullException(nameof(policy));
builder.AddHttpMessageHandler(() => new PolicyHttpMessageHandler(policy));
return builder;
}
此方法定义在PollyHttpClientBuilderExtensions
class 中,它为IHttpClientBuilder
.
如您所见,它只是将另一个 HttpMessageHandler
注册到链中。
现在,让我们看看这个特殊的处理程序长什么样子
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
if (request == null) throw new ArgumentNullException(nameof(request));
// Guarantee the existence of a context for every policy execution,
// but only create a new one if needed.
// This allows later handlers to flow state if desired.
var cleanUpContext = false;
var context = request.GetPolicyExecutionContext();
if (context == null)
{
context = new Context();
request.SetPolicyExecutionContext(context);
cleanUpContext = true;
}
HttpResponseMessage response;
try
{
var policy = _policy ?? SelectPolicy(request);
response = await policy.ExecuteAsync((c, ct) => SendCoreAsync(request, c, ct), context, cancellationToken).ConfigureAwait(false);
}
finally
{
if (cleanUpContext)
request.SetPolicyExecutionContext(null);
}
return response;
}
这个方法是在PolicyHttpMessageHandler
里面定义的。
如您所见,这里没有发生任何异常情况
- 我们要么检索要么创建一个新的上下文
- 我们要么从注册表中检索策略,要么使用提供的策略
- 我们执行装饰
SendCoreAsync
的策略
那么,魔法发生在哪里呢?让我们跳转这个 class
的 documentation commentAll policies provided by Polly are designed to be efficient when used in a long-lived way. Certain policies such as the Bulkhead and Circuit-Breaker maintain state and should be scoped across calls you wish to share the Bulkhead or Circuit-Breaker state. Take care to ensure the correct lifetimes when using policies and message handlers together in custom scenarios. The extension methods provided by
PollyHttpClientBuilderExtensions
are designed to assign a long lifetime to policies and ensure that they can be used when the handler rotation feature is active.
要了解 Retry 与 Circuit Breaker 有何不同,让我们看看他们的 Engine
的 Implementation
签名
internal static class RetryEngine
{
internal static TResult Implementation<TResult>(
Func<Context, CancellationToken, TResult> action,
Context context,
CancellationToken cancellationToken,
ExceptionPredicates shouldRetryExceptionPredicates,
ResultPredicates<TResult> shouldRetryResultPredicates,
Action<DelegateResult<TResult>, TimeSpan, int, Context> onRetry,
int permittedRetryCount = Int32.MaxValue,
IEnumerable<TimeSpan> sleepDurationsEnumerable = null,
Func<int, DelegateResult<TResult>, Context, TimeSpan> sleepDurationProvider = null)
{
...
}
}
internal class CircuitBreakerEngine
{
internal static TResult Implementation<TResult>(
Func<Context, CancellationToken, TResult> action,
Context context,
CancellationToken cancellationToken,
ExceptionPredicates shouldHandleExceptionPredicates,
ResultPredicates<TResult> shouldHandleResultPredicates,
ICircuitController<TResult> breakerController)
{
...
}
}
这里你要注意的是ICircuitController
。不同策略执行之间的CircuitStateController
base class stores the state information. One of its derived class is shared
internal readonly ICircuitController<EmptyStruct> _breakerController;
...
CircuitBreakerEngine.Implementation(
action,
context,
cancellationToken,
ExceptionPredicates,
ResultPredicates,
_breakerController);
我希望这能澄清一些事情。