按时间和一些额外条件限制 C# 中传出请求的最佳方法?
Best way to throttle outgoing requests in C# by time and some extra criteria?
我必须调用一个外部 HTTP API,它只允许 userId 每 4 秒发出一个请求。只要我调用这个 API 每次发送不同的 userId,我可以随时调用它。
在此代码中,我能够遵守外部 API 费率,但我没有以最佳方式执行此操作,因为即使该 userId 没有,一些请求也会被先前的调用阻止'不需要等待。 (查看代码中的注释)
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp2
{
class Program
{
static void Main(string[] args)
{
var caller = new ExternalAPICaller();
caller.RunCalls();
Console.ReadKey();
}
}
public class ExternalAPICaller
{
private static SemaphoreSlim throttler = new SemaphoreSlim(20); // up to 20 concurrent calls
private static List<string> userIds = new List<string>();
private const int rateLimitByUser = 4000;
public async Task CallAPIWithThrottling(string userId)
{
if (userIds.Contains(userId)) Thread.Sleep(rateLimitByUser);
userIds.Add(userId);
await throttler.WaitAsync();
var task = MockHttpCall(userId);
_ = task.ContinueWith(async s =>
{
await Task.Delay(rateLimitByUser);
throttler.Release();
userIds.Remove(userId);
});
}
public Task MockHttpCall(string id)
{
Console.WriteLine("http call for " + id);
Thread.Sleep(300);
return Task.CompletedTask;
}
public async Task RunCalls()
{
await CallAPIWithThrottling("Mike");
await CallAPIWithThrottling("John");
await CallAPIWithThrottling("Sarah");
await CallAPIWithThrottling("Matt");
await CallAPIWithThrottling("John");
await CallAPIWithThrottling("Jacob"); // this should be called right away, but the second John makes it wait
await CallAPIWithThrottling("John");
await CallAPIWithThrottling("Amy"); // this should be called right away, but the thrid John makes it wait
}
}
}
我会尝试抽象节流功能,以便我可以独立测试它。我会制作一个 Throttler
class 可以配置全局和每个用户的并发限制和延迟。在您的情况下,配置为:
- 全局并发限制:20
- 全局延迟:0(允许不同用户同时请求)
- 每用户并发限制:1
- 每个用户延迟:4000
这里是 Throttler
class 的一个实现。为简单起见,省略了每个用户的并发限制(每个用户需要第二个 SemaphoreSlim
)。
public class Throttler<TKey>
{
private readonly SemaphoreSlim _globalConcurrencySemaphore;
private readonly SemaphoreSlim _globalDelaySemaphore;
private readonly int _globalDelay;
private readonly int _perKeyDelay;
private readonly ConcurrentDictionary<TKey, SemaphoreSlim> _perKeyDelaySemaphores;
public Throttler(int globalConcurrencyLimit, int globalDelay, int perKeyDelay)
{
_globalConcurrencySemaphore = new SemaphoreSlim(globalConcurrencyLimit,
globalConcurrencyLimit);
_globalDelaySemaphore = new SemaphoreSlim(1, 1);
_globalDelay = globalDelay;
_perKeyDelay = perKeyDelay;
_perKeyDelaySemaphores = new ConcurrentDictionary<TKey, SemaphoreSlim>();
}
public async Task<TResult> Execute<TResult>(TKey key,
Func<Task<TResult>> taskFactory)
{
var perKeyDelaySemaphore = _perKeyDelaySemaphores.GetOrAdd(
key, _ => new SemaphoreSlim(1, 1));
await perKeyDelaySemaphore.WaitAsync().ConfigureAwait(false);
ReleaseAsync(perKeyDelaySemaphore, _perKeyDelay);
await _globalDelaySemaphore.WaitAsync().ConfigureAwait(false);
ReleaseAsync(_globalDelaySemaphore, _globalDelay);
await _globalConcurrencySemaphore.WaitAsync().ConfigureAwait(false);
try
{
var task = taskFactory();
return await task.ConfigureAwait(false);
}
finally
{
_globalConcurrencySemaphore.Release();
}
}
private async void ReleaseAsync(SemaphoreSlim semaphore, int delay)
{
await Task.Delay(delay).ConfigureAwait(false);
semaphore.Release();
}
}
延迟发生在一个信号量获取和下一个信号量获取之间。未考虑 HTTP 调用的延迟。
用法示例:
var throttler = new Throttler<string>(20, 0, 4000);
var keys = new string[] { "Mike", "John", "Sarah", "Matt", "John", "Jacob",
"John", "Amy" };
var tasks = new List<Task>();
foreach (var key in keys)
{
tasks.Add(throttler.Execute(key, () => MockHttpCall(key)));
}
Task.WaitAll(tasks.ToArray());
async Task<int> MockHttpCall(string id)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} HTTP call for " + id);
await Task.Delay(300);
return 0;
}
输出:
11:20:41.635 HTTP call for Mike
11:20:41.652 HTTP call for John
11:20:41.652 HTTP call for Sarah
11:20:41.652 HTTP call for Matt
11:20:41.653 HTTP call for Jacob
11:20:41.654 HTTP call for Amy
11:20:45.965 HTTP call for John
11:20:50.272 HTTP call for John
我必须调用一个外部 HTTP API,它只允许 userId 每 4 秒发出一个请求。只要我调用这个 API 每次发送不同的 userId,我可以随时调用它。
在此代码中,我能够遵守外部 API 费率,但我没有以最佳方式执行此操作,因为即使该 userId 没有,一些请求也会被先前的调用阻止'不需要等待。 (查看代码中的注释)
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp2
{
class Program
{
static void Main(string[] args)
{
var caller = new ExternalAPICaller();
caller.RunCalls();
Console.ReadKey();
}
}
public class ExternalAPICaller
{
private static SemaphoreSlim throttler = new SemaphoreSlim(20); // up to 20 concurrent calls
private static List<string> userIds = new List<string>();
private const int rateLimitByUser = 4000;
public async Task CallAPIWithThrottling(string userId)
{
if (userIds.Contains(userId)) Thread.Sleep(rateLimitByUser);
userIds.Add(userId);
await throttler.WaitAsync();
var task = MockHttpCall(userId);
_ = task.ContinueWith(async s =>
{
await Task.Delay(rateLimitByUser);
throttler.Release();
userIds.Remove(userId);
});
}
public Task MockHttpCall(string id)
{
Console.WriteLine("http call for " + id);
Thread.Sleep(300);
return Task.CompletedTask;
}
public async Task RunCalls()
{
await CallAPIWithThrottling("Mike");
await CallAPIWithThrottling("John");
await CallAPIWithThrottling("Sarah");
await CallAPIWithThrottling("Matt");
await CallAPIWithThrottling("John");
await CallAPIWithThrottling("Jacob"); // this should be called right away, but the second John makes it wait
await CallAPIWithThrottling("John");
await CallAPIWithThrottling("Amy"); // this should be called right away, but the thrid John makes it wait
}
}
}
我会尝试抽象节流功能,以便我可以独立测试它。我会制作一个 Throttler
class 可以配置全局和每个用户的并发限制和延迟。在您的情况下,配置为:
- 全局并发限制:20
- 全局延迟:0(允许不同用户同时请求)
- 每用户并发限制:1
- 每个用户延迟:4000
这里是 Throttler
class 的一个实现。为简单起见,省略了每个用户的并发限制(每个用户需要第二个 SemaphoreSlim
)。
public class Throttler<TKey>
{
private readonly SemaphoreSlim _globalConcurrencySemaphore;
private readonly SemaphoreSlim _globalDelaySemaphore;
private readonly int _globalDelay;
private readonly int _perKeyDelay;
private readonly ConcurrentDictionary<TKey, SemaphoreSlim> _perKeyDelaySemaphores;
public Throttler(int globalConcurrencyLimit, int globalDelay, int perKeyDelay)
{
_globalConcurrencySemaphore = new SemaphoreSlim(globalConcurrencyLimit,
globalConcurrencyLimit);
_globalDelaySemaphore = new SemaphoreSlim(1, 1);
_globalDelay = globalDelay;
_perKeyDelay = perKeyDelay;
_perKeyDelaySemaphores = new ConcurrentDictionary<TKey, SemaphoreSlim>();
}
public async Task<TResult> Execute<TResult>(TKey key,
Func<Task<TResult>> taskFactory)
{
var perKeyDelaySemaphore = _perKeyDelaySemaphores.GetOrAdd(
key, _ => new SemaphoreSlim(1, 1));
await perKeyDelaySemaphore.WaitAsync().ConfigureAwait(false);
ReleaseAsync(perKeyDelaySemaphore, _perKeyDelay);
await _globalDelaySemaphore.WaitAsync().ConfigureAwait(false);
ReleaseAsync(_globalDelaySemaphore, _globalDelay);
await _globalConcurrencySemaphore.WaitAsync().ConfigureAwait(false);
try
{
var task = taskFactory();
return await task.ConfigureAwait(false);
}
finally
{
_globalConcurrencySemaphore.Release();
}
}
private async void ReleaseAsync(SemaphoreSlim semaphore, int delay)
{
await Task.Delay(delay).ConfigureAwait(false);
semaphore.Release();
}
}
延迟发生在一个信号量获取和下一个信号量获取之间。未考虑 HTTP 调用的延迟。
用法示例:
var throttler = new Throttler<string>(20, 0, 4000);
var keys = new string[] { "Mike", "John", "Sarah", "Matt", "John", "Jacob",
"John", "Amy" };
var tasks = new List<Task>();
foreach (var key in keys)
{
tasks.Add(throttler.Execute(key, () => MockHttpCall(key)));
}
Task.WaitAll(tasks.ToArray());
async Task<int> MockHttpCall(string id)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} HTTP call for " + id);
await Task.Delay(300);
return 0;
}
输出:
11:20:41.635 HTTP call for Mike
11:20:41.652 HTTP call for John
11:20:41.652 HTTP call for Sarah
11:20:41.652 HTTP call for Matt
11:20:41.653 HTTP call for Jacob
11:20:41.654 HTTP call for Amy
11:20:45.965 HTTP call for John
11:20:50.272 HTTP call for John