如何在 .net Core API 项目中跨多个线程限制对 HttpClient 的所有传出异步调用

How to Throttle all outgoing asynchronous calls to HttpClient across multiple threads in .net Core API project

我正在设计一个 .net 核心网络 api,它使用我无法控制的外部 api。我发现了一些关于堆栈溢出的优秀答案,这些答案允许我在同一线程中使用 semaphoreslim 限制我对这个外部 API 的请求。我想知道如何最好地将这种限制扩展到应用程序范围,而不是仅仅针对特定的任务列表进行限制。我一直在学习 HttpMessageHandlers,这似乎是拦截所有传出消息并应用限制的可能方法。但是我担心我可能不理解的线程安全和锁定问题。我包括了我当前的节流代码,希望这可能有助于理解我正在尝试做的事情,但是跨多个线程,并且不断添加任务而不是预定义的任务列表。

private static async Task<List<iMISPagedResultResponse>> GetAsyncThrottled(List<int> pages, int throttle, IiMISClient client, string url, int limit)
{
        var rtn = new List<PagedResultResponse>();
        var allTasks = new List<Task>();
        var throttler = new SemaphoreSlim(initialCount: throttle);
        foreach (var page in pages)
        {
            await throttler.WaitAsync();
            allTasks.Add(
                Task.Run(async () =>
                {
                    try
                    {
                        var result = await GetPagedResult(client, url, page);
                        return result;
                    }
                    finally
                    {
                        throttler.Release();
                    }
                }));
        }
        await Task.WhenAll(allTasks);
        foreach (var task in allTasks)
        {
            var result = ((Task<PagedResultResponse>)task).Result;
            rtn.Add(result);
        }
        return rtn;
}

概念性问题

实现简单

所以 ThrottlingDelegatingHandler 可能看起来像这样:

public class ThrottlingDelegatingHandler : DelegatingHandler
{
    private SemaphoreSlim _throttler;

    public ThrottlingDelegatingHandler(SemaphoreSlim throttler)
    {
        _throttler = throttler ?? throw new ArgumentNullException(nameof(throttler));
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        if (request == null) throw new ArgumentNullException(nameof(request));

        await _throttler.WaitAsync(cancellationToken);
        try
        {
            return await base.SendAsync(request, cancellationToken);
        }
        finally
        {
            _throttler.Release();
        }
    }
}

创建并维护一个实例作为单例:

int maxParallelism = 10;
var throttle = new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)); 

DelegatingHandler 应用到要并行调节调用的 HttpClient 的所有实例:

HttpClient throttledClient = new HttpClient(throttle);

HttpClient 不需要是单例:只有 throttle 实例需要。

为简洁起见,我省略了 Dot Net Core DI 代码,但您可以使用 .Net Core 的容器注册单例 ThrottlingDelegatingHandler 实例,在使用点通过 DI 获取该单例,然后使用它在 HttpClient 中构建,如上所示。

但是:

更好的实现:使用 HttpClientFactory (.NET Core 2.1+)

以上仍然回避了您将如何管理 HttpClient 生命周期的问题:

  • 单例(应用范围)HttpClients do not pick up DNS updates。您的应用程序将忽略 DNS 更新,除非您终止并重新启动它(可能不受欢迎)。
  • 一种频繁创建和处置的模式,using (HttpClient client = ) { },另一方面,can cause socket exhaustion

HttpClientFactory 的设计目标之一是管理 HttpClient 实例及其委托处理程序的生命周期,以避免这些问题。

在 .NET Core 2.1 中,您可以使用 HttpClientFactory 将其全部连接到 Startup class 中的 ConfigureServices(IServiceCollection services),如下所示:

int maxParallelism = 10;
services.AddSingleton<ThrottlingDelegatingHandler>(new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)));

services.AddHttpClient("MyThrottledClient")
    .AddHttpMessageHandler<ThrottlingDelegatingHandler>();

(这里的“MyThrottledClient”是 named-client approach just to keep this example short; typed clients 避免字符串命名。)

在使用点,通过 DI (reference) 获得一个 IHttpClientFactory,然后调用

var client = _clientFactory.CreateClient("MyThrottledClient");

获取预配置了单例的 HttpClient 实例 ThrottlingDelegatingHandler

通过以这种方式获得的 HttpClient 实例的所有调用将被限制(通常,在整个应用程序中)到最初配置的 int maxParallelism

HttpClientFactory 神奇地处理了所有 HttpClient 生命周期问题。

更好的实现:将 Polly 与 IHttpClientFactory 结合使用以获得所有这些 'out-of-the-box'

波莉是 deeply integrated with IHttpClientFactory and Polly also provides Bulkhead policy which works as a parallelism throttle by an identical SemaphoreSlim mechanism

因此,作为手动滚动 ThrottlingDelegatingHandler 的替代方法,您还可以直接将 Polly Bulkhead 策略与 IHttpClientFactory 一起使用。在您的 Startup class 中,只需:

int maxParallelism = 10;
var throttler = Policy.BulkheadAsync<HttpResponseMessage>(maxParallelism, Int32.MaxValue);

services.AddHttpClient("MyThrottledClient")
    .AddPolicyHandler(throttler);

像之前一样从 HttpClientFactory 获取预配置的 HttpClient 实例。和以前一样,通过这样一个“MyThrottledClient”HttpClient 实例的所有调用都将被并行限制到配置的 maxParallelism.

Polly Bulkhead 策略还提供了配置您希望同时允许多少操作的功能 'queue' 主信号量中的执行槽。因此,例如:

var throttler = Policy.BulkheadAsync<HttpResponseMessage>(10, 100);

当如上配置为 HttpClient 时,将允许 10 个并行 http 调用,以及最多 100 个对 'queue' 的 http 调用用于一个执行槽。这可以通过防止出现故障的下游系统导致上游排队调用的资源过多膨胀,从而为高吞吐量系统提供额外的弹性。

要将 Polly 选项与 HttpClientFactory 结合使用,请引入 Microsoft.Extensions.Http.Polly and Polly nuget 包。

参考文献:Polly deep doco on Polly and IHttpClientFactory; Bulkhead policy.


关于任务的附录

问题使用 Task.Run(...) 并提到:

a .net core web api that consumes an external api

和:

with tasks being continuously added instead of a pre-defined list of tasks.

如果您的 .net 核心网站 api 每次请求仅消耗外部 API 一次 .net 核心网站 api 处理,并且您采用了本答案其余部分中讨论的方法,将下游外部 http 调用卸载到新的 TaskTask.Run(...) 将是不必要的,只会在额外的 Task 实例和线程中产生开销-交换。 dot net core 已经运行 线程池上的多个线程传入请求。