Python API 速率限制 - 如何在全球范围内限制 API 呼叫
Python API Rate Limiting - How to Limit API Calls Globally
我试图在我的代码中限制 API 调用。我已经找到了一个不错的 python 库 ratelimiter==1.0.2.post0
https://pypi.python.org/pypi/ratelimiter
但是,该库只能在本地范围内限制速率。即)在函数和循环中
# Decorator
@RateLimiter(max_calls=10, period=1)
def do_something():
pass
# Context Manager
rate_limiter = RateLimiter(max_calls=10, period=1)
for i in range(100):
with rate_limiter:
do_something()
因为我有几个函数,它们在不同的地方进行 API 调用,所以我想在 global 范围内限制 API 调用。
例如,假设我想将 API 的调用限制为 每秒一次 。而且,假设我有函数 x
和 y
,其中进行了两次 API 调用。
@rate(...)
def x():
...
@rate(...)
def y():
...
通过用 limiter
修饰函数,我可以限制这两个函数的速率。
但是,如果我按顺序执行上述两个函数,它就会失去对 global 范围内 API 调用次数的跟踪,因为它们彼此不知道。因此,y
将在 x
执行后立即调用,无需再等待一秒钟。而且,这将违反 每秒一次 限制。
有什么方法或库可以用来限制全局在python中的速率吗?
毕竟我自己实现了Throttler
class。通过将每个 API 请求代理到 request
方法,我们可以跟踪所有 API 请求。利用传递函数作为 request
方法参数,它还缓存结果以减少 API 次调用。
class TooManyRequestsError(Exception):
def __str__(self):
return "More than 30 requests have been made in the last five seconds."
class Throttler(object):
cache = {}
def __init__(self, max_rate, window, throttle_stop=False, cache_age=1800):
# Dict of max number of requests of the API rate limit for each source
self.max_rate = max_rate
# Dict of duration of the API rate limit for each source
self.window = window
# Whether to throw an error (when True) if the limit is reached, or wait until another request
self.throttle_stop = throttle_stop
# The time, in seconds, for which to cache a response
self.cache_age = cache_age
# Initialization
self.next_reset_at = dict()
self.num_requests = dict()
now = datetime.datetime.now()
for source in self.max_rate:
self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))
self.num_requests[source] = 0
def request(self, source, method, do_cache=False):
now = datetime.datetime.now()
# if cache exists, no need to make api call
key = source + method.func_name
if do_cache and key in self.cache:
timestamp, data = self.cache.get(key)
logging.info('{} exists in cached @ {}'.format(key, timestamp))
if (now - timestamp).seconds < self.cache_age:
logging.info('retrieved cache for {}'.format(key))
return data
# <--- MAKE API CALLS ---> #
# reset the count if the period passed
if now > self.next_reset_at.get(source):
self.num_requests[source] = 0
self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))
# throttle request
def halt(wait_time):
if self.throttle_stop:
raise TooManyRequestsError()
else:
# Wait the required time, plus a bit of extra padding time.
time.sleep(wait_time + 0.1)
# if exceed max rate, need to wait
if self.num_requests.get(source) >= self.max_rate.get(source):
logging.info('back off: {} until {}'.format(source, self.next_reset_at.get(source)))
halt((self.next_reset_at.get(source) - now).seconds)
self.num_requests[source] += 1
response = method() # potential exception raise
# cache the response
if do_cache:
self.cache[key] = (now, response)
logging.info('cached instance for {}, {}'.format(source, method))
return response
许多 API 提供商限制开发人员进行过多的 API 调用。
Python ratelimit 包引入了一个函数装饰器,防止函数被调用的次数超过 API 提供程序允许的次数。
来自 ratelimit 导入限制
import requests
TIME_PERIOD = 900 # time period in seconds
@limits(calls=15, period=TIME_PERIOD)
def call_api(url):
response = requests.get(url)
if response.status_code != 200:
raise Exception('API response: {}'.format(response.status_code))
return response
注意:此功能在 15 分钟内不能进行超过 15 API 次调用。
添加到 Sunil 答案中,您需要添加 @sleep_and_retry 装饰器,否则您的代码将在达到速率限制时中断:
@sleep_and_retry
@limits(calls=0.05, period=1)
def api_call(url, api_key):
r = requests.get(
url,
headers={'X-Riot-Token': api_key}
)
if r.status_code != 200:
raise Exception('API Response: {}'.format(r.status_code))
return r
我遇到了同样的问题,我有一堆调用相同 API 的不同函数,我想在全球范围内进行速率限制。我最终做的是创建一个启用速率限制的空函数。
PS: 我使用在这里找到的不同速率限制库:https://pypi.org/project/ratelimit/
from ratelimit import limits, sleep_and_retry
# 30 calls per minute
CALLS = 30
RATE_LIMIT = 60
@sleep_and_retry
@limits(calls=CALLS, period=RATE_LIMIT)
def check_limit():
''' Empty function just to check for calls to API '''
return
然后我就在调用 API:
的每个函数的开头调用该函数
def get_something_from_api(http_session, url):
check_limit()
response = http_session.get(url)
return response
如果达到限制,程序将休眠直到(在我的例子中)60 秒过去,然后正常恢复。
有很多精美的库可以提供漂亮的装饰器和特殊的安全功能,但下面的库应该可以与 django.core.cache
或任何其他带有 get
和 set
的缓存一起使用方法:
def hit_rate_limit(key, max_hits, max_hits_interval):
'''Implement a basic rate throttler. Prevent more than max_hits occurring
within max_hits_interval time period (seconds).'''
# Use the django cache, but can be any object with get/set
from django.core.cache import cache
hit_count = cache.get(key) or 0
logging.info("Rate Limit: %s --> %s", key, hit_count)
if hit_count > max_hits:
return True
cache.set(key, hit_count + 1, max_hits_interval)
return False
使用 Python 标准库:
import threading
from time import time, sleep
b = threading.Barrier(2)
def belay(s=1):
"""Block the main thread for `s` seconds."""
while True:
b.wait()
sleep(s)
def request_something():
b.wait()
print(f'something at {time()}')
def request_other():
b.wait()
print(f'or other at {time()}')
if __name__ == '__main__':
thread = threading.Thread(target=belay)
thread.daemon = True
thread.start()
# request a lot of things
i = 0
while (i := i+1) < 5:
request_something()
request_other()
打印的每个时间戳之间大约有 s
秒。因为主线程等待而不是休眠,所以它响应请求所花费的时间与请求之间的(最小)时间无关。
我试图在我的代码中限制 API 调用。我已经找到了一个不错的 python 库 ratelimiter==1.0.2.post0
https://pypi.python.org/pypi/ratelimiter
但是,该库只能在本地范围内限制速率。即)在函数和循环中
# Decorator
@RateLimiter(max_calls=10, period=1)
def do_something():
pass
# Context Manager
rate_limiter = RateLimiter(max_calls=10, period=1)
for i in range(100):
with rate_limiter:
do_something()
因为我有几个函数,它们在不同的地方进行 API 调用,所以我想在 global 范围内限制 API 调用。
例如,假设我想将 API 的调用限制为 每秒一次 。而且,假设我有函数 x
和 y
,其中进行了两次 API 调用。
@rate(...)
def x():
...
@rate(...)
def y():
...
通过用 limiter
修饰函数,我可以限制这两个函数的速率。
但是,如果我按顺序执行上述两个函数,它就会失去对 global 范围内 API 调用次数的跟踪,因为它们彼此不知道。因此,y
将在 x
执行后立即调用,无需再等待一秒钟。而且,这将违反 每秒一次 限制。
有什么方法或库可以用来限制全局在python中的速率吗?
毕竟我自己实现了Throttler
class。通过将每个 API 请求代理到 request
方法,我们可以跟踪所有 API 请求。利用传递函数作为 request
方法参数,它还缓存结果以减少 API 次调用。
class TooManyRequestsError(Exception):
def __str__(self):
return "More than 30 requests have been made in the last five seconds."
class Throttler(object):
cache = {}
def __init__(self, max_rate, window, throttle_stop=False, cache_age=1800):
# Dict of max number of requests of the API rate limit for each source
self.max_rate = max_rate
# Dict of duration of the API rate limit for each source
self.window = window
# Whether to throw an error (when True) if the limit is reached, or wait until another request
self.throttle_stop = throttle_stop
# The time, in seconds, for which to cache a response
self.cache_age = cache_age
# Initialization
self.next_reset_at = dict()
self.num_requests = dict()
now = datetime.datetime.now()
for source in self.max_rate:
self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))
self.num_requests[source] = 0
def request(self, source, method, do_cache=False):
now = datetime.datetime.now()
# if cache exists, no need to make api call
key = source + method.func_name
if do_cache and key in self.cache:
timestamp, data = self.cache.get(key)
logging.info('{} exists in cached @ {}'.format(key, timestamp))
if (now - timestamp).seconds < self.cache_age:
logging.info('retrieved cache for {}'.format(key))
return data
# <--- MAKE API CALLS ---> #
# reset the count if the period passed
if now > self.next_reset_at.get(source):
self.num_requests[source] = 0
self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))
# throttle request
def halt(wait_time):
if self.throttle_stop:
raise TooManyRequestsError()
else:
# Wait the required time, plus a bit of extra padding time.
time.sleep(wait_time + 0.1)
# if exceed max rate, need to wait
if self.num_requests.get(source) >= self.max_rate.get(source):
logging.info('back off: {} until {}'.format(source, self.next_reset_at.get(source)))
halt((self.next_reset_at.get(source) - now).seconds)
self.num_requests[source] += 1
response = method() # potential exception raise
# cache the response
if do_cache:
self.cache[key] = (now, response)
logging.info('cached instance for {}, {}'.format(source, method))
return response
许多 API 提供商限制开发人员进行过多的 API 调用。
Python ratelimit 包引入了一个函数装饰器,防止函数被调用的次数超过 API 提供程序允许的次数。
来自 ratelimit 导入限制
import requests
TIME_PERIOD = 900 # time period in seconds
@limits(calls=15, period=TIME_PERIOD)
def call_api(url):
response = requests.get(url)
if response.status_code != 200:
raise Exception('API response: {}'.format(response.status_code))
return response
注意:此功能在 15 分钟内不能进行超过 15 API 次调用。
添加到 Sunil 答案中,您需要添加 @sleep_and_retry 装饰器,否则您的代码将在达到速率限制时中断:
@sleep_and_retry
@limits(calls=0.05, period=1)
def api_call(url, api_key):
r = requests.get(
url,
headers={'X-Riot-Token': api_key}
)
if r.status_code != 200:
raise Exception('API Response: {}'.format(r.status_code))
return r
我遇到了同样的问题,我有一堆调用相同 API 的不同函数,我想在全球范围内进行速率限制。我最终做的是创建一个启用速率限制的空函数。
PS: 我使用在这里找到的不同速率限制库:https://pypi.org/project/ratelimit/
from ratelimit import limits, sleep_and_retry
# 30 calls per minute
CALLS = 30
RATE_LIMIT = 60
@sleep_and_retry
@limits(calls=CALLS, period=RATE_LIMIT)
def check_limit():
''' Empty function just to check for calls to API '''
return
然后我就在调用 API:
的每个函数的开头调用该函数def get_something_from_api(http_session, url):
check_limit()
response = http_session.get(url)
return response
如果达到限制,程序将休眠直到(在我的例子中)60 秒过去,然后正常恢复。
有很多精美的库可以提供漂亮的装饰器和特殊的安全功能,但下面的库应该可以与 django.core.cache
或任何其他带有 get
和 set
的缓存一起使用方法:
def hit_rate_limit(key, max_hits, max_hits_interval):
'''Implement a basic rate throttler. Prevent more than max_hits occurring
within max_hits_interval time period (seconds).'''
# Use the django cache, but can be any object with get/set
from django.core.cache import cache
hit_count = cache.get(key) or 0
logging.info("Rate Limit: %s --> %s", key, hit_count)
if hit_count > max_hits:
return True
cache.set(key, hit_count + 1, max_hits_interval)
return False
使用 Python 标准库:
import threading
from time import time, sleep
b = threading.Barrier(2)
def belay(s=1):
"""Block the main thread for `s` seconds."""
while True:
b.wait()
sleep(s)
def request_something():
b.wait()
print(f'something at {time()}')
def request_other():
b.wait()
print(f'or other at {time()}')
if __name__ == '__main__':
thread = threading.Thread(target=belay)
thread.daemon = True
thread.start()
# request a lot of things
i = 0
while (i := i+1) < 5:
request_something()
request_other()
打印的每个时间戳之间大约有 s
秒。因为主线程等待而不是休眠,所以它响应请求所花费的时间与请求之间的(最小)时间无关。