Semaphoreslim 处理每个时间段的节流
Semaphore slim to handle throttling per time period
我有一个客户的要求,要调用他们的 API,但是,由于节流限制,我们只能在一分钟内进行 100 个 API 调用。我正在使用 SemaphoreSlim 来处理它,这是我的代码。
async Task<List<IRestResponse>> GetAllResponses(List<string> locationApiCalls)
{
var semaphoreSlim = new SemaphoreSlim(initialCount: 100, maxCount: 100);
var failedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();
var passedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();
var tasks = locationApiCalls.Select(async locationApiCall =>
{
await semaphoreSlim.WaitAsync();
try
{
var response = await RestApi.GetResponseAsync(locationApi);
if (response.IsSuccessful)
{
passedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
}
else
{
failedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
}
}
finally
{
semaphoreSlim.Release();
}
});
await Task.WhenAll(tasks);
var passedResponsesList = passedResponses.SelectMany(x => x).ToList();
}
然而这一行
var passedResponsesList = passedResponses.SelectMany(x => x).ToList();
永远不会执行,我也看到很多失败的响应,我想我也必须在代码中的某处添加 Task.Delay(1 分钟)。
您需要跟踪前 100 个请求中每个请求的执行时间。在下面的示例实现中,ConcurrentQueue<TimeSpan>
记录了前 100 个请求中每个请求的相对完成时间。通过从该队列中取出第一个(也是最早的)时间,您可以检查自 100 个请求前已经过去了多少时间。如果不到一分钟,那么下一个请求需要等待一分钟的剩余时间才能执行。
async Task<List<IRestResponse>> GetAllResponses(List<string> locationApiCalls)
{
var semaphoreSlim = new SemaphoreSlim(initialCount: 100, maxCount: 100);
var total = 0;
var stopwatch = Stopwatch.StartNew();
var completionTimes = new ConcurrentQueue<TimeSpan>();
var failedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();
var passedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();
var tasks = locationApiCalls.Select(async locationApiCall =>
{
await semaphoreSlim.WaitAsync();
if (Interlocked.Increment(ref total) > 100)
{
completionTimes.TryDequeue(out var earliest);
var elapsed = stopwatch.Elapsed - earliest;
var delay = TimeSpan.FromSeconds(60) - elapsed;
if (delay > TimeSpan.Zero)
await Task.Delay(delay);
}
try
{
var response = await RestApi.GetResponseAsync(locationApi);
if (response.IsSuccessful)
{
passedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
}
else
{
failedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
}
}
finally
{
completionTimes.Enqueue(stopwatch.Elapsed);
semaphoreSlim.Release();
}
});
await Task.WhenAll(tasks);
var passedResponsesList = passedResponses.SelectMany(x => x).ToList();
}
如果您从 WinForms 或 WPF 应用程序的 UI 线程调用此方法,请记住将 ConfigureAwait(false)
添加到其 await
语句中。
我有一个客户的要求,要调用他们的 API,但是,由于节流限制,我们只能在一分钟内进行 100 个 API 调用。我正在使用 SemaphoreSlim 来处理它,这是我的代码。
async Task<List<IRestResponse>> GetAllResponses(List<string> locationApiCalls)
{
var semaphoreSlim = new SemaphoreSlim(initialCount: 100, maxCount: 100);
var failedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();
var passedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();
var tasks = locationApiCalls.Select(async locationApiCall =>
{
await semaphoreSlim.WaitAsync();
try
{
var response = await RestApi.GetResponseAsync(locationApi);
if (response.IsSuccessful)
{
passedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
}
else
{
failedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
}
}
finally
{
semaphoreSlim.Release();
}
});
await Task.WhenAll(tasks);
var passedResponsesList = passedResponses.SelectMany(x => x).ToList();
}
然而这一行
var passedResponsesList = passedResponses.SelectMany(x => x).ToList();
永远不会执行,我也看到很多失败的响应,我想我也必须在代码中的某处添加 Task.Delay(1 分钟)。
您需要跟踪前 100 个请求中每个请求的执行时间。在下面的示例实现中,ConcurrentQueue<TimeSpan>
记录了前 100 个请求中每个请求的相对完成时间。通过从该队列中取出第一个(也是最早的)时间,您可以检查自 100 个请求前已经过去了多少时间。如果不到一分钟,那么下一个请求需要等待一分钟的剩余时间才能执行。
async Task<List<IRestResponse>> GetAllResponses(List<string> locationApiCalls)
{
var semaphoreSlim = new SemaphoreSlim(initialCount: 100, maxCount: 100);
var total = 0;
var stopwatch = Stopwatch.StartNew();
var completionTimes = new ConcurrentQueue<TimeSpan>();
var failedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();
var passedResponses = new ConcurrentBag<IReadOnlyCollection<IRestResponse>>();
var tasks = locationApiCalls.Select(async locationApiCall =>
{
await semaphoreSlim.WaitAsync();
if (Interlocked.Increment(ref total) > 100)
{
completionTimes.TryDequeue(out var earliest);
var elapsed = stopwatch.Elapsed - earliest;
var delay = TimeSpan.FromSeconds(60) - elapsed;
if (delay > TimeSpan.Zero)
await Task.Delay(delay);
}
try
{
var response = await RestApi.GetResponseAsync(locationApi);
if (response.IsSuccessful)
{
passedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
}
else
{
failedResponses.Add((IReadOnlyCollection<IRestResponse>)response);
}
}
finally
{
completionTimes.Enqueue(stopwatch.Elapsed);
semaphoreSlim.Release();
}
});
await Task.WhenAll(tasks);
var passedResponsesList = passedResponses.SelectMany(x => x).ToList();
}
如果您从 WinForms 或 WPF 应用程序的 UI 线程调用此方法,请记住将 ConfigureAwait(false)
添加到其 await
语句中。