处理并发 http 请求时的 IHttpClient Polly 超时和 WaitAndRetry 策略说明

IHttpClient Polly Timeout and WaitAndRetry policy when handling concurrent http requests Clarification

只是想问一个关于 Pollys timeout/retry 政策的问题,以及它在处理并发 http 请求时的工作方式。

根据自己的理解,超时和重试策略将应用于每个单独的 http 请求,因此如果我们有 5 个 http 请求,每个请求都会有自己的超时和重试策略,因此从每个 http 请求下面的代码将在 5 秒后超时并重试总共 4 次。

    public override void Configure(IFunctionsHostBuilder builder)
    {
        
        var timeout = Policy.TimeoutAsync<HttpResponseMessage>(TimeSpan.FromSeconds(5));

        builder.Services
            .AddHttpClient("PointsbetClient")
            .AddPolicyHandler(GetRetryPolicy())
            .AddPolicyHandler(timeout);
    }

    private static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
    {
        return HttpPolicyExtensions
            .HandleTransientHttpError()
            .Or<TimeoutRejectedException>()
            .WaitAndRetryAsync(Backoff.DecorrelatedJitterBackoffV2(
                medianFirstRetryDelay: TimeSpan.FromMilliseconds(500),
                retryCount: 4));
    }

现在假设我有 1000 个 http 请求,我需要对其进行 GetAsync() 调用,以便我可以抓取它们的数据,出于性能目的,我使用 await Task.WhenAll(tasks); 同时进行这些调用。由于 1000 一次请求太多,我使用 SemaphoreSlim class 并将 MaxParallelRequests 限制为 100.

现在如何应用 Polly 重试和超时策略?它是否仍适用于这 1000 个请求中的每一个,或者它将包含 100 个请求的 1 个任务视为单个 timeout/retry 策略?根据我的理解,它仍然会处理每个单独的 http 请求并将策略应用于每个单独的 http 请求,我一直在搜索但找不到关于此的确认。

简短的回答是,它们是分开处理的。


为了了解系统的工作原理,我们必须深入了解。让我们从 AddPolicyHandler.

开始我们的旅程吧

免责声明为了简洁起见,我稍微编辑了代码片段。

public static IHttpClientBuilder AddPolicyHandler(this IHttpClientBuilder builder, IAsyncPolicy<HttpResponseMessage> policy)
{
    if (builder == null) throw new ArgumentNullException(nameof(builder));
    if (policy == null) throw new ArgumentNullException(nameof(policy));
    builder.AddHttpMessageHandler(() => new PolicyHttpMessageHandler(policy));
    return builder;
}

此方法定义在PollyHttpClientBuilderExtensions class 中,它为IHttpClientBuilder.

提供扩展方法

如您所见,它只是将另一个 HttpMessageHandler 注册到链中。

现在,让我们看看这个特殊的处理程序长什么样子

protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
    if (request == null) throw new ArgumentNullException(nameof(request));

    // Guarantee the existence of a context for every policy execution, 
    // but only create a new one if needed.
    // This allows later handlers to flow state if desired.
    var cleanUpContext = false;
    var context = request.GetPolicyExecutionContext();
    if (context == null)
    {
        context = new Context();
        request.SetPolicyExecutionContext(context);
        cleanUpContext = true;
    }

    HttpResponseMessage response;
    try
    {
        var policy = _policy ?? SelectPolicy(request);
        response = await policy.ExecuteAsync((c, ct) => SendCoreAsync(request, c, ct), context, cancellationToken).ConfigureAwait(false);
    }
    finally
    {
        if (cleanUpContext)
            request.SetPolicyExecutionContext(null);
    }

    return response;
}

这个方法是在PolicyHttpMessageHandler里面定义的。

如您所见,这里没有发生任何异常情况

  • 我们要么检索要么创建一个新的上下文
  • 我们要么从注册表中检索策略,要么使用提供的策略
  • 我们执行装饰 SendCoreAsync
  • 的策略

那么,魔法发生在哪里呢?让我们跳转这个 class

documentation comment

All policies provided by Polly are designed to be efficient when used in a long-lived way. Certain policies such as the Bulkhead and Circuit-Breaker maintain state and should be scoped across calls you wish to share the Bulkhead or Circuit-Breaker state. Take care to ensure the correct lifetimes when using policies and message handlers together in custom scenarios. The extension methods provided by PollyHttpClientBuilderExtensions are designed to assign a long lifetime to policies and ensure that they can be used when the handler rotation feature is active.

要了解 Retry 与 Circuit Breaker 有何不同,让我们看看他们的 EngineImplementation 签名

RetryEngine

internal static class RetryEngine
{
    internal static TResult Implementation<TResult>(
        Func<Context, CancellationToken, TResult> action,
        Context context,
        CancellationToken cancellationToken,
        ExceptionPredicates shouldRetryExceptionPredicates,
        ResultPredicates<TResult> shouldRetryResultPredicates,
        Action<DelegateResult<TResult>, TimeSpan, int, Context> onRetry,
        int permittedRetryCount = Int32.MaxValue,
        IEnumerable<TimeSpan> sleepDurationsEnumerable = null,
        Func<int, DelegateResult<TResult>, Context, TimeSpan> sleepDurationProvider = null)
    {
    ...
    }
}

CircuitBreakerEngine

internal class CircuitBreakerEngine
{
    internal static TResult Implementation<TResult>(
        Func<Context, CancellationToken, TResult> action,
        Context context,
        CancellationToken cancellationToken,
        ExceptionPredicates shouldHandleExceptionPredicates, 
        ResultPredicates<TResult> shouldHandleResultPredicates, 
        ICircuitController<TResult> breakerController)
    {
    ...
    }
}

这里你要注意的是ICircuitController。不同策略执行之间的CircuitStateController base class stores the state information. One of its derived class is shared

internal readonly ICircuitController<EmptyStruct> _breakerController;
...
CircuitBreakerEngine.Implementation(
    action,
    context,
    cancellationToken,
    ExceptionPredicates,
    ResultPredicates,
    _breakerController);

我希望这能澄清一些事情。