在漏桶算法中,当队列未满时实现固定速率的正确逻辑是什么?
In leaky-bucket algorithm, when the queue is not full what's the correct logic to achieve fixed rate?
我正在学习漏桶算法,想通过使用 redis 和 golang http 编写一些简单的代码来亲自动手。
当我在这里用关键词redis, leaky, bucket搜索的时候。 [1]中有很多类似的问题,这很好。但是,我发现在浏览了这些线程和 wiki[2] 之后,我无法理解整个逻辑。我想有些事情我不明白,也没有意识到。所以我想在这里重新表述一下;如果我说错了请指正。
伪代码:
key := "ip address, token or anything that can be the representative of a client"
redis_queue_size := 5
interval_between_each_request := 7
request := obtain_http_request_from_somewhere()
if check_current_queue_size() < redis_queue_size:
if is_queue_empty()
add_request_to_the_queue() // zadd "ip1" now() now() // now() is something like seconds, milliseconds or nanoseconds e.g. t = 1
process_request(request)
else
now := get_current_time()
// add_request_to_... retrieves the first element in the queue
// compute the expected timestamp to execute the request and its current time
// e.g. zadd "ip1" <time of the first elment in the queue + interval_between_each_request> now
add_request_to_redis_queue_with_timestamp(now, interval_between_each_request) // e.g. zadd "ip" <timestamp as score> <timestamp a request is allowed to be executed>
// Below function check_the_time_left...() will check how many time left at which the current request need to wait.
// For instance, the first request stored in the queue with the command
// zadd "ip1" 1 1 // t = 1
// and the second request arrives at t = 4 but it is allowed t be executed at t = 8
// zadd "ip1" 8 4 // where 4 := now, 8 := 1 + interval_between_each_request
// so the N will be 4
N := check_the_time_left_for_the_current_request_to_execute(now, interval_between_each_request)
sleep(N) // now the request wait for 4 seconds before processing the request
process_request(http_request_obj)
else
return // discard request
我理解的部分是当队列已满时,接下来的请求将被丢弃。但是我想我可能会误解当队列未满时,如何重塑传入请求以便它可以以固定速率执行。
我很感激任何建议
[1]。 https://whosebug.com/search?q=redis+leaky+bucket+&s=aa2eaa93-a6ba-4e31-9a83-68f791c5756e
有多种方法可以实现漏桶,但该过程应该分为两个独立的部分。一种是将东西放入桶中,另一种是如果有任何东西要移除,则按设定的时间间隔将它们移除。
您可以使用单独的 goroutine 以设定的时间间隔使用消息。这将简化您的代码,因为在一条代码路径上您只需查看队列大小并丢弃数据包,而另一条代码路径将只消耗现有的一切。
如果这是为了简单的速率限制,我们看到大多数 Redis 用户都采用使用排序集的滑动 window 方法 https://github.com/Redislabs-Solution-Architects/RateLimitingExample/blob/sliding_window/app.py
如果您设置在漏桶上,您可以考虑使用每个消费者 ID(apiToken/IP 地址等)的 redis 流,如下所示
请求进入 consumerID
XADD 请求-[consumerID] MAXLEN [BUCKET SIZE]
如有必要,为该 consumerID 生成一个 go routine
获取当前时间
如果 requests-[consumerID] 的 XLEN 为 0 exit go routine
XREAD COUNT [number_of_requests_per_period] BLOCK [时间段 - 1 ms] STREAMS 请求 - [consumerID]
获取当前时间并在剩余时间段内休眠
https://redis.io/commands#stream 详细介绍流的工作方式
我正在学习漏桶算法,想通过使用 redis 和 golang http 编写一些简单的代码来亲自动手。
当我在这里用关键词redis, leaky, bucket搜索的时候。 [1]中有很多类似的问题,这很好。但是,我发现在浏览了这些线程和 wiki[2] 之后,我无法理解整个逻辑。我想有些事情我不明白,也没有意识到。所以我想在这里重新表述一下;如果我说错了请指正。
伪代码:
key := "ip address, token or anything that can be the representative of a client"
redis_queue_size := 5
interval_between_each_request := 7
request := obtain_http_request_from_somewhere()
if check_current_queue_size() < redis_queue_size:
if is_queue_empty()
add_request_to_the_queue() // zadd "ip1" now() now() // now() is something like seconds, milliseconds or nanoseconds e.g. t = 1
process_request(request)
else
now := get_current_time()
// add_request_to_... retrieves the first element in the queue
// compute the expected timestamp to execute the request and its current time
// e.g. zadd "ip1" <time of the first elment in the queue + interval_between_each_request> now
add_request_to_redis_queue_with_timestamp(now, interval_between_each_request) // e.g. zadd "ip" <timestamp as score> <timestamp a request is allowed to be executed>
// Below function check_the_time_left...() will check how many time left at which the current request need to wait.
// For instance, the first request stored in the queue with the command
// zadd "ip1" 1 1 // t = 1
// and the second request arrives at t = 4 but it is allowed t be executed at t = 8
// zadd "ip1" 8 4 // where 4 := now, 8 := 1 + interval_between_each_request
// so the N will be 4
N := check_the_time_left_for_the_current_request_to_execute(now, interval_between_each_request)
sleep(N) // now the request wait for 4 seconds before processing the request
process_request(http_request_obj)
else
return // discard request
我理解的部分是当队列已满时,接下来的请求将被丢弃。但是我想我可能会误解当队列未满时,如何重塑传入请求以便它可以以固定速率执行。
我很感激任何建议
[1]。 https://whosebug.com/search?q=redis+leaky+bucket+&s=aa2eaa93-a6ba-4e31-9a83-68f791c5756e
有多种方法可以实现漏桶,但该过程应该分为两个独立的部分。一种是将东西放入桶中,另一种是如果有任何东西要移除,则按设定的时间间隔将它们移除。
您可以使用单独的 goroutine 以设定的时间间隔使用消息。这将简化您的代码,因为在一条代码路径上您只需查看队列大小并丢弃数据包,而另一条代码路径将只消耗现有的一切。
如果这是为了简单的速率限制,我们看到大多数 Redis 用户都采用使用排序集的滑动 window 方法 https://github.com/Redislabs-Solution-Architects/RateLimitingExample/blob/sliding_window/app.py
如果您设置在漏桶上,您可以考虑使用每个消费者 ID(apiToken/IP 地址等)的 redis 流,如下所示
请求进入 consumerID
XADD 请求-[consumerID] MAXLEN [BUCKET SIZE]
如有必要,为该 consumerID 生成一个 go routine 获取当前时间 如果 requests-[consumerID] 的 XLEN 为 0 exit go routine
XREAD COUNT [number_of_requests_per_period] BLOCK [时间段 - 1 ms] STREAMS 请求 - [consumerID] 获取当前时间并在剩余时间段内休眠
https://redis.io/commands#stream 详细介绍流的工作方式