如何使用 Observables 构建速率限制 API?

How to build a rate-limiting API with Observables?

我想创建一个简单的计算器服务,它只有一种方法可以添加数字。这个 Add 方法应该是 async 并且必须限制在给定时间进行的并发调用的数量。例如,每秒不超过 5 个并发调用。如果超过速率限制,调用应该抛出异常。

class 应该是这样的:

public class RateLimitingCalculator
{
    public async Task<int> Add(int a, int b) 
    {
        //...
    }
}

有什么想法吗?我想用 Reactive Extensions 来实现它,但如果使用其他策略更好,我会坚持下去!

我认为在这里使用 Rx 没有意义,除非您可以按照 Enigmativity 在评论中的建议将您的方法重写为 public IObservable<int> Add(IObservable<Tuple<int, int>> values) 之类的东西。

我要做的是将速率限制的问题分离到一个单独的 class 中。这样,您的代码可能如下所示:

public class RateLimitingCalculator
{
    private RateLimiter rateLimiter = new RateLimiter(5, TimeSpan.FromSeconds(1));

    public async Task<int> Add(int a, int b) 
    {
        rateLimiter.ThrowIfRateExceeded();

        //...
    }
}

RateLimiter 的实现取决于您的具体要求,但一个非常简单的非线程安全版本可能如下所示:

class RateLimiter
{
    private readonly int rate;
    private readonly TimeSpan perTime;

    private DateTime secondStart = DateTime.MinValue;
    private int count = 0;

    public RateLimiter(int rate, TimeSpan perTime)
    {
        this.rate = rate;
        this.perTime = perTime;
    }

    public void ThrowIfRateExceeded()
    {
        var now = DateTime.UtcNow;

        if (now - secondStart > perTime)
        {
            secondStart = now;
            count = 1;
            return;
        }

        if (count >= rate)
            throw new RateLimitExceededException();

        count++;
    }
}