按时间和一些额外条件限制 C# 中传出请求的最佳方法?

Best way to throttle outgoing requests in C# by time and some extra criteria?

我必须调用一个外部 HTTP API,它只允许 userId 每 4 秒发出一个请求。只要我调用这个 API 每次发送不同的 userId,我可以随时调用它。

在此代码中,我能够遵守外部 API 费率,但我没有以最佳方式执行此操作,因为即使该 userId 没有,一些请求也会被先前的调用阻止'不需要等待。 (查看代码中的注释)

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp2
{
    class Program
    {
        static void Main(string[] args)
        {
            var caller = new ExternalAPICaller();

            caller.RunCalls();

            Console.ReadKey();
        }
    }

    public class ExternalAPICaller
    {
        private static SemaphoreSlim throttler = new SemaphoreSlim(20); // up to 20 concurrent calls

        private static List<string> userIds = new List<string>();

        private const int rateLimitByUser = 4000;

        public async Task CallAPIWithThrottling(string userId)
        {
            if (userIds.Contains(userId)) Thread.Sleep(rateLimitByUser);

            userIds.Add(userId);

            await throttler.WaitAsync();

            var task = MockHttpCall(userId);

            _ = task.ContinueWith(async s =>
            {
                await Task.Delay(rateLimitByUser);
                throttler.Release();
                userIds.Remove(userId);
            });
        }

        public Task MockHttpCall(string id)
        {
            Console.WriteLine("http call for " + id);
            Thread.Sleep(300);
            return Task.CompletedTask;
        }

        public async Task RunCalls()
        {
            await CallAPIWithThrottling("Mike");
            await CallAPIWithThrottling("John");
            await CallAPIWithThrottling("Sarah");
            await CallAPIWithThrottling("Matt");

            await CallAPIWithThrottling("John");
            await CallAPIWithThrottling("Jacob"); // this should be called right away, but the second John makes it wait

            await CallAPIWithThrottling("John");
            await CallAPIWithThrottling("Amy"); // this should be called right away, but the thrid John makes it wait
        }
    }
}

我会尝试抽象节流功能,以便我可以独立测试它。我会制作一个 Throttler class 可以配置全局和每个用户的并发限制和延迟。在您的情况下,配置为:

  • 全局并发限制:20
  • 全局延迟:0(允许不同用户同时请求)
  • 每用户并发限制:1
  • 每个用户延迟:4000

这里是 Throttler class 的一个实现。为简单起见,省略了每个用户的并发限制(每个用户需要第二个 SemaphoreSlim)。

public class Throttler<TKey>
{
    private readonly SemaphoreSlim _globalConcurrencySemaphore;
    private readonly SemaphoreSlim _globalDelaySemaphore;
    private readonly int _globalDelay;
    private readonly int _perKeyDelay;
    private readonly ConcurrentDictionary<TKey, SemaphoreSlim> _perKeyDelaySemaphores;

    public Throttler(int globalConcurrencyLimit, int globalDelay, int perKeyDelay)
    {
        _globalConcurrencySemaphore = new SemaphoreSlim(globalConcurrencyLimit,
             globalConcurrencyLimit);
        _globalDelaySemaphore = new SemaphoreSlim(1, 1);
        _globalDelay = globalDelay;
        _perKeyDelay = perKeyDelay;
        _perKeyDelaySemaphores = new ConcurrentDictionary<TKey, SemaphoreSlim>();
    }

    public async Task<TResult> Execute<TResult>(TKey key,
        Func<Task<TResult>> taskFactory)
    {
        var perKeyDelaySemaphore = _perKeyDelaySemaphores.GetOrAdd(
            key, _ => new SemaphoreSlim(1, 1));
        await perKeyDelaySemaphore.WaitAsync().ConfigureAwait(false);
        ReleaseAsync(perKeyDelaySemaphore, _perKeyDelay);
        await _globalDelaySemaphore.WaitAsync().ConfigureAwait(false);
        ReleaseAsync(_globalDelaySemaphore, _globalDelay);
        await _globalConcurrencySemaphore.WaitAsync().ConfigureAwait(false);
        try
        {
            var task = taskFactory();
            return await task.ConfigureAwait(false);
        }
        finally
        {
            _globalConcurrencySemaphore.Release();
        }
    }

    private async void ReleaseAsync(SemaphoreSlim semaphore, int delay)
    {
        await Task.Delay(delay).ConfigureAwait(false);
        semaphore.Release();
    }
}

延迟发生在一个信号量获取和下一个信号量获取之间。未考虑 HTTP 调用的延迟。

用法示例:

var throttler = new Throttler<string>(20, 0, 4000);
var keys = new string[] { "Mike", "John", "Sarah", "Matt", "John", "Jacob",
    "John", "Amy" };
var tasks = new List<Task>();
foreach (var key in keys)
{
    tasks.Add(throttler.Execute(key, () => MockHttpCall(key)));
}
Task.WaitAll(tasks.ToArray());

async Task<int> MockHttpCall(string id)
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} HTTP call for " + id);
    await Task.Delay(300);
    return 0;
}

输出:

11:20:41.635 HTTP call for Mike
11:20:41.652 HTTP call for John
11:20:41.652 HTTP call for Sarah
11:20:41.652 HTTP call for Matt
11:20:41.653 HTTP call for Jacob
11:20:41.654 HTTP call for Amy
11:20:45.965 HTTP call for John
11:20:50.272 HTTP call for John