Python API 速率限制 - 如何在全球范围内限制 API 呼叫

Python API Rate Limiting - How to Limit API Calls Globally

我试图在我的代码中限制 API 调用。我已经找到了一个不错的 python 库 ratelimiter==1.0.2.post0 https://pypi.python.org/pypi/ratelimiter

但是,该库只能在本地范围内限制速率。即)在函数和循环中

# Decorator
@RateLimiter(max_calls=10, period=1)
def do_something():
    pass


# Context Manager
rate_limiter = RateLimiter(max_calls=10, period=1)

for i in range(100):
    with rate_limiter:
        do_something()

因为我有几个函数,它们在不同的地方进行 API 调用,所以我想在 global 范围内限制 API 调用。

例如,假设我想将 API 的调用限制为 每秒一次 。而且,假设我有函数 xy,其中进行了两次 API 调用。

@rate(...)
def x():
   ...

@rate(...)
def y():
   ...

通过用 limiter 修饰函数,我可以限制这两个函数的速率。

但是,如果我按顺序执行上述两个函数,它就会失去对 global 范围内 API 调用次数的跟踪,因为它们彼此不知道。因此,y 将在 x 执行后立即调用,无需再等待一秒钟。而且,这将违反 每秒一次 限制。

有什么方法或库可以用来限制全局在python中的速率吗?

毕竟我自己实现了Throttlerclass。通过将每个 API 请求代理到 request 方法,我们可以跟踪所有 API 请求。利用传递函数作为 request 方法参数,它还缓存结果以减少 API 次调用。

class TooManyRequestsError(Exception):
    def __str__(self):
        return "More than 30 requests have been made in the last five seconds."


class Throttler(object):
    cache = {}

    def __init__(self, max_rate, window, throttle_stop=False, cache_age=1800):
        # Dict of max number of requests of the API rate limit for each source
        self.max_rate = max_rate
        # Dict of duration of the API rate limit for each source
        self.window = window
        # Whether to throw an error (when True) if the limit is reached, or wait until another request
        self.throttle_stop = throttle_stop
        # The time, in seconds, for which to cache a response
        self.cache_age = cache_age
        # Initialization
        self.next_reset_at = dict()
        self.num_requests = dict()

        now = datetime.datetime.now()
        for source in self.max_rate:
            self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))
            self.num_requests[source] = 0

    def request(self, source, method, do_cache=False):
        now = datetime.datetime.now()

        # if cache exists, no need to make api call
        key = source + method.func_name
        if do_cache and key in self.cache:
            timestamp, data = self.cache.get(key)
            logging.info('{} exists in cached @ {}'.format(key, timestamp))

            if (now - timestamp).seconds < self.cache_age:
                logging.info('retrieved cache for {}'.format(key))
                return data

        # <--- MAKE API CALLS ---> #

        # reset the count if the period passed
        if now > self.next_reset_at.get(source):
            self.num_requests[source] = 0
            self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))

        # throttle request
        def halt(wait_time):
            if self.throttle_stop:
                raise TooManyRequestsError()
            else:
                # Wait the required time, plus a bit of extra padding time.
                time.sleep(wait_time + 0.1)

        # if exceed max rate, need to wait
        if self.num_requests.get(source) >= self.max_rate.get(source):
            logging.info('back off: {} until {}'.format(source, self.next_reset_at.get(source)))
            halt((self.next_reset_at.get(source) - now).seconds)

        self.num_requests[source] += 1
        response = method()  # potential exception raise

        # cache the response
        if do_cache:
            self.cache[key] = (now, response)
            logging.info('cached instance for {}, {}'.format(source, method))

        return response

许多 API 提供商限制开发人员进行过多的 API 调用。

Python ratelimit 包引入了一个函数装饰器,防止函数被调用的次数超过 API 提供程序允许的次数。

来自 ratelimit 导入限制

import requests
TIME_PERIOD = 900   # time period in seconds

@limits(calls=15, period=TIME_PERIOD)
def call_api(url):
    response = requests.get(url)

    if response.status_code != 200:
        raise Exception('API response: {}'.format(response.status_code))
    return response

注意:此功能在 15 分钟内不能进行超过 15 API 次调用。

添加到 Sunil 答案中,您需要添加 @sleep_and_retry 装饰器,否则您的代码将在达到速率限制时中断:

@sleep_and_retry
@limits(calls=0.05, period=1)
def api_call(url, api_key):
    r = requests.get(
        url,
        headers={'X-Riot-Token': api_key}
        )
    if r.status_code != 200:
        raise Exception('API Response: {}'.format(r.status_code))
    return r

我遇到了同样的问题,我有一堆调用相同 API 的不同函数,我想在全球范围内进行速率限制。我最终做的是创建一个启用速率限制的空函数。

PS: 我使用在这里找到的不同速率限制库:https://pypi.org/project/ratelimit/

from ratelimit import limits, sleep_and_retry

# 30 calls per minute
CALLS = 30
RATE_LIMIT = 60

@sleep_and_retry
@limits(calls=CALLS, period=RATE_LIMIT)
def check_limit():
''' Empty function just to check for calls to API '''
return

然后我就在调用 API:

的每个函数的开头调用该函数
def get_something_from_api(http_session, url):
    check_limit()
    response = http_session.get(url)
    return response

如果达到限制,程序将休眠直到(在我的例子中)60 秒过去,然后正常恢复。

有很多精美的库可以提供漂亮的装饰器和特殊的安全功能,但下面的库应该可以与 django.core.cache 或任何其他带有 getset 的缓存一起使用方法:

def hit_rate_limit(key, max_hits, max_hits_interval):
    '''Implement a basic rate throttler. Prevent more than max_hits occurring
    within max_hits_interval time period (seconds).'''
    # Use the django cache, but can be any object with get/set
    from django.core.cache import cache
    hit_count = cache.get(key) or 0
    logging.info("Rate Limit: %s --> %s", key, hit_count)
    if hit_count > max_hits:
        return True
    cache.set(key, hit_count + 1, max_hits_interval)
    return False

使用 Python 标准库:

import threading
from time import time, sleep

b = threading.Barrier(2)

def belay(s=1):
    """Block the main thread for `s` seconds."""
    while True:
        b.wait()
        sleep(s)

def request_something():
    b.wait()
    print(f'something at {time()}')

def request_other():
    b.wait()
    print(f'or other at {time()}')
    

if __name__ == '__main__':

    thread = threading.Thread(target=belay)
    thread.daemon = True
    thread.start()

    # request a lot of things
    i = 0
    while (i := i+1) < 5:
        request_something()
        request_other()

打印的每个时间戳之间大约有 s 秒。因为主线程等待而不是休眠,所以它响应请求所花费的时间与请求之间的(最小)时间无关。