分布式出站 http 速率限制器
Distributed outbound http rate limiter
我有一个微服务架构应用程序,其中有多个服务轮询外部 API。外部 API 的速率限制为每分钟 600 个请求。我怎样才能让我的所有实例一起保持低于共享的 600 速率限制?
Google只给我带来了3个解决方案,最有希望的是:
- myntra/golimit 三者中最有前途的,但我真的不知道如何设置它。
- wallstreetcn/rate 似乎只有在达到限制时才会拒绝(我的应用程序需要等到它可以发出请求)并且
rate.NewLimiter
func 中的 Every
函数似乎成为一个不同的导入/依赖项,我无法弄清楚它是什么
- manavo/go-rate-limiter 有一个 "soft" 限制,显然,这可能会让我超过限制。有些端点如果我不能访问它们几秒钟我真的不介意,但其他端点请求应该尽可能工作。
目前我有一个业余的解决方案。下面的代码允许我设置每分钟的限制,它会在请求之间休眠以在一分钟内传播请求。此客户端速率限制是针对每个实例的,因此我必须通过硬编码将 600 个请求除以实例数量。
var semaphore = make(chan struct{}, 5)
var rate = make(chan struct{}, 10)
func init(){
// leaky bucket
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
_, ok := <-rate
// if this isn't going to run indefinitely, signal
// this to return by closing the rate channel.
if !ok {
return
}
}
}()
并在发出 http API 请求的函数内部。
rate <- struct{}{}
// check the concurrency semaphore
semaphore <- struct{}{}
defer func() {
<-semaphore
}()
我怎样才能让我的所有实例一起保持低于共享的 600 速率限制?
首选项:
- 基于一个键的速率限制计数器,因此可以设置多个计数器。
- 在设定的持续时间内分散请求,这样 600 个请求不会在前 30 秒内发送,而是在整分钟持续时间内发送。
如果你想要一个全局速率限制器,你需要一个地方来维护分布式状态,比如zookeeper。通常,我们不想支付间接费用。或者,您可以设置一个正向代理(https://golang.org/pkg/net/http/httputil/#ReverseProxy),在其中进行速率限制。
我无法与您找到的库对话,但是 leaky bucket 速率限制器非常简单。您需要某种共享事务存储。每个桶(或速率限制器)然后只是一个整数和一个时间值。整数是特定时间桶中的滴数。每次必须应用速率限制时,减去自上次更新以来泄漏的滴数,然后加一个,然后检查滴数是否在桶的容量范围内。
我们正在使用 Redis 来做这类事情。要在 Redis 中实现此事务,需要一个脚本(参见 SCRIPT LOAD and EVALSHA)。例如,在 SQL 数据库中,一个 SELECT FOR UPDATE
后跟一个 UPDATE
语句可以达到同样的效果。这是我们的 Redis 脚本:
-- replicate_commands allows us to use the TIME command. We depend on accurate
-- (and reasonably consistent) timestamps. Multiple clients may have
-- inacceptable clock drift.
redis.replicate_commands()
local rate = tonumber(ARGV[1]) -- how many drops leak away in one second
local cap = tonumber(ARGV[2]) -- how many drops fit in the bucket
local now, _ = unpack(redis.call('TIME'))
-- A bucket is represented by a hash with two keys, n and t. n is the number of
-- drops in the bucket at time t (seconds since epoch).
local xs = redis.call('HMGET', KEYS[1], 'n', 't')
local n = tonumber(xs[1])
local t = tonumber(xs[2])
if type(n) ~= "number" or type(t) ~= "number" then
-- The bucket doesn't exist yet (n and t are false), or someone messed with
-- our hash values. Either way, pretend the bucket is empty.
n, t = 0, now
end
-- remove drops that leaked since t
n = n - (now-t)*rate
if n < 0 then
n = 0
end
-- add one drop if it fits
if n < cap then
n = n + 1
else
n = cap
end
redis.call('HMSET', KEYS[1], 'n', n, 't', now)
redis.call('EXPIRE', KEYS[1], math.floor(n/rate) + 1)
return n
示例调用每秒 10 滴,容量为 10 滴:
EVALSHA <SHA_IN_HEX> 1 rate-limit:my-bucket 10 10
脚本returns桶中的滴数。如果该数字等于容量,您可以休眠一小段时间然后重试,或者直接拒绝请求,具体取决于您的要求。
请注意脚本永远不会 returns 大于容量的值,因此在您的情况下恢复时间不超过十分之一秒。这可能不是您所需要的,因为您正在尝试匹配 third-party 速率限制器。 IE。您可能对溢出桶没问题,导致请求爆发后恢复时间更长。
我有一个微服务架构应用程序,其中有多个服务轮询外部 API。外部 API 的速率限制为每分钟 600 个请求。我怎样才能让我的所有实例一起保持低于共享的 600 速率限制?
Google只给我带来了3个解决方案,最有希望的是:
- myntra/golimit 三者中最有前途的,但我真的不知道如何设置它。
- wallstreetcn/rate 似乎只有在达到限制时才会拒绝(我的应用程序需要等到它可以发出请求)并且
rate.NewLimiter
func 中的Every
函数似乎成为一个不同的导入/依赖项,我无法弄清楚它是什么 - manavo/go-rate-limiter 有一个 "soft" 限制,显然,这可能会让我超过限制。有些端点如果我不能访问它们几秒钟我真的不介意,但其他端点请求应该尽可能工作。
目前我有一个业余的解决方案。下面的代码允许我设置每分钟的限制,它会在请求之间休眠以在一分钟内传播请求。此客户端速率限制是针对每个实例的,因此我必须通过硬编码将 600 个请求除以实例数量。
var semaphore = make(chan struct{}, 5)
var rate = make(chan struct{}, 10)
func init(){
// leaky bucket
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
_, ok := <-rate
// if this isn't going to run indefinitely, signal
// this to return by closing the rate channel.
if !ok {
return
}
}
}()
并在发出 http API 请求的函数内部。
rate <- struct{}{}
// check the concurrency semaphore
semaphore <- struct{}{}
defer func() {
<-semaphore
}()
我怎样才能让我的所有实例一起保持低于共享的 600 速率限制?
首选项: - 基于一个键的速率限制计数器,因此可以设置多个计数器。 - 在设定的持续时间内分散请求,这样 600 个请求不会在前 30 秒内发送,而是在整分钟持续时间内发送。
如果你想要一个全局速率限制器,你需要一个地方来维护分布式状态,比如zookeeper。通常,我们不想支付间接费用。或者,您可以设置一个正向代理(https://golang.org/pkg/net/http/httputil/#ReverseProxy),在其中进行速率限制。
我无法与您找到的库对话,但是 leaky bucket 速率限制器非常简单。您需要某种共享事务存储。每个桶(或速率限制器)然后只是一个整数和一个时间值。整数是特定时间桶中的滴数。每次必须应用速率限制时,减去自上次更新以来泄漏的滴数,然后加一个,然后检查滴数是否在桶的容量范围内。
我们正在使用 Redis 来做这类事情。要在 Redis 中实现此事务,需要一个脚本(参见 SCRIPT LOAD and EVALSHA)。例如,在 SQL 数据库中,一个 SELECT FOR UPDATE
后跟一个 UPDATE
语句可以达到同样的效果。这是我们的 Redis 脚本:
-- replicate_commands allows us to use the TIME command. We depend on accurate
-- (and reasonably consistent) timestamps. Multiple clients may have
-- inacceptable clock drift.
redis.replicate_commands()
local rate = tonumber(ARGV[1]) -- how many drops leak away in one second
local cap = tonumber(ARGV[2]) -- how many drops fit in the bucket
local now, _ = unpack(redis.call('TIME'))
-- A bucket is represented by a hash with two keys, n and t. n is the number of
-- drops in the bucket at time t (seconds since epoch).
local xs = redis.call('HMGET', KEYS[1], 'n', 't')
local n = tonumber(xs[1])
local t = tonumber(xs[2])
if type(n) ~= "number" or type(t) ~= "number" then
-- The bucket doesn't exist yet (n and t are false), or someone messed with
-- our hash values. Either way, pretend the bucket is empty.
n, t = 0, now
end
-- remove drops that leaked since t
n = n - (now-t)*rate
if n < 0 then
n = 0
end
-- add one drop if it fits
if n < cap then
n = n + 1
else
n = cap
end
redis.call('HMSET', KEYS[1], 'n', n, 't', now)
redis.call('EXPIRE', KEYS[1], math.floor(n/rate) + 1)
return n
示例调用每秒 10 滴,容量为 10 滴:
EVALSHA <SHA_IN_HEX> 1 rate-limit:my-bucket 10 10
脚本returns桶中的滴数。如果该数字等于容量,您可以休眠一小段时间然后重试,或者直接拒绝请求,具体取决于您的要求。
请注意脚本永远不会 returns 大于容量的值,因此在您的情况下恢复时间不超过十分之一秒。这可能不是您所需要的,因为您正在尝试匹配 third-party 速率限制器。 IE。您可能对溢出桶没问题,导致请求爆发后恢复时间更长。