如何在 .Net Core Web API 中限制并发外部 API 调用?

How to limit concurrent external API calls in .Net Core Web API?

目前我正在开发一个 .net 核心网络 api 项目,它正在从外部网络 api 获取数据。他们最后有一个 25 的并发速率限制器(允许 25 个并发 api 调用)。第 26 次 API 调用将失败。

所以我想在我的网络 API 项目上实施并发 API 速率限制器,并且需要跟踪第 26 次 API 失败的调用并需要重试(它可能获取或 post 调用)。我的 api 代码

中有多个获取请求和 post 请求

以下是我的 httpservice.cs 在我的网站 api

public HttpClient GetHttpClient()
{
    HttpClient client = new HttpClient
    {
        BaseAddress = new Uri(APIServer),
    };
    client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
    client.DefaultRequestHeaders.Add("Authorization", ("Bearer " + Access_Token));
    return client;
}
private HttpClient Client;
public async Task<Object> Get(string apiEndpoint)
{

    Client = GetHttpClient();
    HttpResponseMessage httpResponseMessage = await Client.GetAsync(apiEndpoint);
    if (httpResponseMessage.IsSuccessStatusCode)
    {
        Object response = await httpResponseMessage.Content.ReadAsStringAsync();
        return response;
    }
    else if (httpResponseMessage.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
    {
        //need to track failed calls                    
        return StatusCode(httpResponseMessage.StatusCode.GetHashCode());
    }
}

public async Task<Object> Post(string apiEndpoint, Object request)
{
    Client = GetHttpClient();
    HttpResponseMessage httpResponseMessage = await Client.PostAsJsonAsync(apiEndpoint, request);
    if (httpResponseMessage.IsSuccessStatusCode)
    {
        return await httpResponseMessage.Content.ReadAsAsync<Object>();
    }

    else if (httpResponseMessage.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
    {
        //need to track
        return StatusCode(httpResponseMessage.StatusCode.GetHashCode());
    }
} 

如何在上面的示例中限制并发 api 调用

SemaphoreSlim _semaphoregate = new SemaphoreSlim(25);
await _semaphoregate.WaitAsync();      
_semaphoregate.Release();  

这行得通吗?

AspNetCoreRateLimit nuget 包在这里有用吗?这会限制上述示例的并发吗?

请帮忙。

我所知道的限制对一段代码的并发访问数量的最简单的解决方案是使用 SemaphoreSlim 对象,以实现节流机制。

您可以考虑下面的方法,您应该根据自己的实际情况进行调整(以下代码比较简单,只是为了给您展示大概的思路):

public class Program 
{
    private static async Task DoSomethingAsync()
    {
      // this is the code for which you want to limit the concurrent execution
    }

    // this is meant to guarantee at most 5 concurrent execution of the code in DoSomethingAsync
    private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(5); 

    // here we execute 100 calls to DoSomethingAsync, by ensuring that at most 5 calls are executed concurrently
    public static async Task Main(string[] args) 
    {
        var tasks = new List<Task>();
        
        for(int i = 0; i < 100; i++) 
        {
            tasks.Add(ThrottledDoSomethingAsync());
        }
        
        await Task.WhenAll(tasks);
    }

    private static async Task ThrottledDoSomethingAsync()
    {
      await _semaphore.WaitAsync();
      
      try
      {
        await DoSomethingAsync();
      }
      finally
      {
        _semaphore.Release();
      }
    }
}

Here 您可以找到 SemaphoreSlim class.

的文档

如果您想要 ForEachAsync 方法之类的东西,可以考虑阅读有关该主题的 my own question

如果您正在寻找使用 SemaphoreSlim 作为服务节流机制的优雅解决方案,您可以考虑为服务本身定义一个接口并使用 装饰器模式.在装饰器中,您可以使用 SemaphoreSlim 实现节流逻辑,如上所示,同时在服务的核心实现中保持服务逻辑简单且不受影响。这与您的问题并不严格相关,它只是写下您的 HTTP 服务的实际实现的提示。 SemaphoreSlim 用作节流机制的核心思想是上面代码中显示的那个。

调整您的代码的最低限度如下:

public sealed class HttpService
{
    // this must be static in order to be shared between different instances
    // this code is based on a max of 25 concurrent requests to the API
    // both GET and POST requests are taken into account (they are globally capped to a maximum of 25 concurrent requests to the API)
    private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(25);

    public HttpClient GetHttpClient()
    {
        HttpClient client = new HttpClient
        {
            BaseAddress = new Uri(APIServer),
        };
        client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
        client.DefaultRequestHeaders.Add("Authorization", ("Bearer " + Access_Token));
        return client;
    }

    private HttpClient Client;

    public async Task<Object> Get(string apiEndpoint)
    {

        Client = GetHttpClient();
        HttpResponseMessage httpResponseMessage = await this.ExecuteGetRequest(apiEndpoint);
        if (httpResponseMessage.IsSuccessStatusCode)
        {
            Object response = await httpResponseMessage.Content.ReadAsStringAsync();
            return response;
        }
        else if (httpResponseMessage.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
        {
            //need to track failed calls                    
            return StatusCode(httpResponseMessage.StatusCode.GetHashCode());
        }
    }

    private async Task<HttpResponseMessage> ExecuteGetRequest(string url)
    {
        await _semaphore.WaitAsync();

        try
        {
            return await this.Client.GetAsync(url);
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async Task<Object> Post(string apiEndpoint, Object request)
    {
        Client = GetHttpClient();
        HttpResponseMessage httpResponseMessage = await this.ExecutePostRequest(apiEndpoint, request);
        if (httpResponseMessage.IsSuccessStatusCode)
        {
            return await httpResponseMessage.Content.ReadAsAsync<Object>();
        }

        else if (httpResponseMessage.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
        {
            //need to track
            return StatusCode(httpResponseMessage.StatusCode.GetHashCode());
        }
    }

    private async Task<HttpResponseMessage> ExecutePostRequest(string url, Object request)
    {
        await _semaphore.WaitAsync();

        try
        {
            return await this.Client.PostAsJsonAsync(url, request);
        }
        finally
        {
            _semaphore.Release();
        }
    }
}

重要说明:每次您需要对 API 执行 HTTP 请求时,您发布的代码都会创建一个全新的 HttpClient 实例。由于超出您的问题范围的原因,这是有问题的。我强烈建议您也阅读 this article and this one