Semaphoreslim 处理每个时间段的节流

Semaphore slim to handle throttling per time period

我有一个客户的要求,要调用他们的 API,但是,由于节流限制,我们只能在一分钟内进行 100 个 API 调用。我正在使用 SemaphoreSlim 来处理它,这是我的代码。

 async Task<List<IRestResponse>> GetAllResponses(List<string> locationApiCalls) 
 {
     var semaphoreSlim = new SemaphoreSlim(initialCount: 100, maxCount: 100);
     var failedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();
     var passedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();

     var tasks = locationApiCalls.Select(async locationApiCall =>
     {
          await semaphoreSlim.WaitAsync();
          try
          {
              var response = await RestApi.GetResponseAsync(locationApi);
              if (response.IsSuccessful)
              {
                  passedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
              }
              else
              {
                 failedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
              }
           }
           finally
           {
               semaphoreSlim.Release();
           }
     });

     await Task.WhenAll(tasks);
     var passedResponsesList = passedResponses.SelectMany(x => x).ToList();
}

然而这一行

var passedResponsesList = passedResponses.SelectMany(x => x).ToList();

永远不会执行,我也看到很多失败的响应,我想我也必须在代码中的某处添加 Task.Delay(1 分钟)。

您需要跟踪前 100 个请求中每个请求的执行时间。在下面的示例实现中,ConcurrentQueue<TimeSpan> 记录了前 100 个请求中每个请求的相对完成时间。通过从该队列中取出第一个(也是最早的)时间,您可以检查自 100 个请求前已经过去了多少时间。如果不到一分钟,那么下一个请求需要等待一分钟的剩余时间才能执行。

async Task<List<IRestResponse>> GetAllResponses(List<string> locationApiCalls)
{
    var semaphoreSlim = new SemaphoreSlim(initialCount: 100, maxCount: 100);
    var total = 0;
    var stopwatch = Stopwatch.StartNew();
    var completionTimes = new ConcurrentQueue<TimeSpan>();

    var failedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();
    var passedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();

    var tasks = locationApiCalls.Select(async locationApiCall =>
    {
        await semaphoreSlim.WaitAsync();

        if (Interlocked.Increment(ref total) > 100)
        {
            completionTimes.TryDequeue(out var earliest);
            var elapsed = stopwatch.Elapsed - earliest;
            var delay = TimeSpan.FromSeconds(60) - elapsed;
            if (delay > TimeSpan.Zero)
                await Task.Delay(delay);
        }

        try
        {
            var response = await RestApi.GetResponseAsync(locationApi);
            if (response.IsSuccessful)
            {
                passedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
            }
            else
            {
                failedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
            }
        }
        finally
        {
            completionTimes.Enqueue(stopwatch.Elapsed);
            semaphoreSlim.Release();
        }
    });

    await Task.WhenAll(tasks);
    var passedResponsesList = passedResponses.SelectMany(x => x).ToList();
}

如果您从 WinForms 或 WPF 应用程序的 UI 线程调用此方法,请记住将 ConfigureAwait(false) 添加到其 await 语句中。