在漏桶算法中,当队列未满时实现固定速率的正确逻辑是什么?

In leaky-bucket algorithm, when the queue is not full what's the correct logic to achieve fixed rate?

我正在学习漏桶算法,想通过使用 redis 和 golang http 编写一些简单的代码来亲自动手。

当我在这里用关键词redis, leaky, bucket搜索的时候。 [1]中有很多类似的问题,这很好。但是,我发现在浏览了这些线程和 wiki[2] 之后,我无法理解整个逻辑。我想有些事情我不明白,也没有意识到。所以我想在这里重新表述一下;如果我说错了请指正。

伪代码:

key := "ip address, token or anything that can be the representative of a client"
redis_queue_size := 5
interval_between_each_request := 7
request := obtain_http_request_from_somewhere()

if check_current_queue_size() < redis_queue_size:
    if is_queue_empty()
        add_request_to_the_queue() // zadd "ip1" now() now() // now() is something like seconds, milliseconds or nanoseconds e.g. t = 1
        process_request(request)
    else
        now := get_current_time()
        // add_request_to_... retrieves the first element in the queue
        // compute the expected timestamp to execute the request and its current time
        // e.g. zadd "ip1" <time of the first elment in the queue + interval_between_each_request> now
        add_request_to_redis_queue_with_timestamp(now, interval_between_each_request) // e.g. zadd "ip" <timestamp as score> <timestamp a request is allowed to be executed>
        // Below function check_the_time_left...() will check how many time left at which the current request need to wait. 
        // For instance, the first request stored in the queue with the command
        //    zadd "ip1" 1 1  // t = 1
        // and the second request arrives at t = 4 but it is allowed t be executed at t = 8
        //    zadd "ip1" 8 4 // where 4 := now, 8 := 1 + interval_between_each_request
        // so the N will be 4 
        N := check_the_time_left_for_the_current_request_to_execute(now, interval_between_each_request) 
        sleep(N) // now the request wait for 4 seconds before processing the request
        process_request(http_request_obj)
else
    return // discard request

我理解的部分是当队列已满时,接下来的请求将被丢弃。但是我想我可能会误解当队列未满时,如何重塑传入请求以便它可以以固定速率执行。

我很感激任何建议

[1]。 https://whosebug.com/search?q=redis+leaky+bucket+&s=aa2eaa93-a6ba-4e31-9a83-68f791c5756e

[2]。 https://en.wikipedia.org/wiki/Leaky_bucket#As_a_queue

有多种方法可以实现漏桶,但该过程应该分为两个独立的部分。一种是将东西放入桶中,另一种是如果有任何东西要移除,则按设定的时间间隔将它们移除。

您可以使用单独的 goroutine 以设定的时间间隔使用消息。这将简化您的代码,因为在一条代码路径上您只需查看队列大小并丢弃数据包,而另一条代码路径将只消耗现有的一切。

  1. 如果这是为了简单的速率限制,我们看到大多数 Redis 用户都采用使用排序集的滑动 window 方法 https://github.com/Redislabs-Solution-Architects/RateLimitingExample/blob/sliding_window/app.py

  2. 如果您设置在漏桶上,您可以考虑使用每个消费者 ID(apiToken/IP 地址等)的 redis 流,如下所示

请求进入 consumerID

XADD 请求-[consumerID] MAXLEN [BUCKET SIZE]

如有必要,为该 consumerID 生成一个 go routine 获取当前时间 如果 requests-[consumerID] 的 XLEN 为 0 exit go routine

XREAD COUNT [number_of_requests_per_period] BLOCK [时间段 - 1 ms] STREAMS 请求 - [consumerID] 获取当前时间并在剩余时间段内休眠

https://redis.io/commands#stream 详细介绍流的工作方式