尝试将油门控制添加到 python 中的并行 API 调用
Trying to add throttle control to paralleled API calls in python
我正在使用 Google 个地方 API,它的 每秒查询 限制为 10。这意味着我不能在一秒钟内发出超过 10 个请求。如果我们使用串行执行,这将不是问题,因为 API 的平均响应时间是 250 毫秒,所以我将能够在一秒钟内进行 4 次调用。
为了利用整个 10 QPS 限制,我使用了 多线程 并进行了并行 API 调用。但是现在我需要控制一秒钟内可以发生的调用次数,它不应该超过 10 (如果我越过 google API 开始抛出错误极限)
以下是我目前的代码,我无法弄清楚为什么程序有时会卡住或花费比要求的时间长很多。
import time
from datetime import datetime
import random
from threading import Lock
from concurrent.futures import ThreadPoolExecutor as pool
import concurrent.futures
import requests
import matplotlib.pyplot as plt
from statistics import mean
from ratelimiter import RateLimiter
def make_parallel(func, qps=10):
lock = Lock()
threads_execution_que = []
limit_hit = False
def qps_manager(arg):
current_second = time.time()
lock.acquire()
if len(threads_execution_que) >= qps or limit_hit:
limit_hit = True
if current_second - threads_execution_que[0] <= 1:
time.sleep(current_second - threads_execution_que[0])
current_time = time.time()
threads_execution_que.append(current_time)
lock.release()
res = func(arg)
lock.acquire()
threads_execution_que.remove(current_time)
lock.release()
return res
def wrapper(iterable, number_of_workers=12):
result = []
with pool(max_workers=number_of_workers) as executer:
bag = {executer.submit(func, i): i for i in iterable}
for future in concurrent.futures.as_completed(bag):
result.append(future.result())
return result
return wrapper
@make_parallel
def api_call(i):
min_func_time = random.uniform(.25, .3)
start_time = time.time()
try:
response = requests.get('https://jsonplaceholder.typicode.com/posts', timeout=1)
except Exception as e:
response = e
if (time.time() - start_time) - min_func_time < 0:
time.sleep(min_func_time - (time.time() - start_time))
return response
api_call([1]*50)
理想情况下,代码不应超过 1.5 秒,但目前大约需要 12-14 秒。
一旦我删除 QPS 管理器逻辑,脚本就会加速到预期速度。
请务必指出我做错了什么,并且如果已经有任何可用的包可以开箱即用。
看起来 ratelimit 就是这样做的:
from ratelimit import limits, sleep_and_retry
@make_parallel
@sleep_and_retry
@limits(calls=10, period=1)
def api_call(i):
try:
response = requests.get("https://jsonplaceholder.typicode.com/posts", timeout=1)
except Exception as e:
response = e
return response
编辑:我做了一些测试,看起来 @sleep_and_retry
有点太乐观了,所以稍微增加一点,到 1.2 秒:
s = datetime.now()
api_call([1] * 50)
elapsed_time = datetime.now() - s
print(elapsed_time > timedelta(seconds=50 / 10))
我正在使用 Google 个地方 API,它的 每秒查询 限制为 10。这意味着我不能在一秒钟内发出超过 10 个请求。如果我们使用串行执行,这将不是问题,因为 API 的平均响应时间是 250 毫秒,所以我将能够在一秒钟内进行 4 次调用。
为了利用整个 10 QPS 限制,我使用了 多线程 并进行了并行 API 调用。但是现在我需要控制一秒钟内可以发生的调用次数,它不应该超过 10 (如果我越过 google API 开始抛出错误极限)
以下是我目前的代码,我无法弄清楚为什么程序有时会卡住或花费比要求的时间长很多。
import time
from datetime import datetime
import random
from threading import Lock
from concurrent.futures import ThreadPoolExecutor as pool
import concurrent.futures
import requests
import matplotlib.pyplot as plt
from statistics import mean
from ratelimiter import RateLimiter
def make_parallel(func, qps=10):
lock = Lock()
threads_execution_que = []
limit_hit = False
def qps_manager(arg):
current_second = time.time()
lock.acquire()
if len(threads_execution_que) >= qps or limit_hit:
limit_hit = True
if current_second - threads_execution_que[0] <= 1:
time.sleep(current_second - threads_execution_que[0])
current_time = time.time()
threads_execution_que.append(current_time)
lock.release()
res = func(arg)
lock.acquire()
threads_execution_que.remove(current_time)
lock.release()
return res
def wrapper(iterable, number_of_workers=12):
result = []
with pool(max_workers=number_of_workers) as executer:
bag = {executer.submit(func, i): i for i in iterable}
for future in concurrent.futures.as_completed(bag):
result.append(future.result())
return result
return wrapper
@make_parallel
def api_call(i):
min_func_time = random.uniform(.25, .3)
start_time = time.time()
try:
response = requests.get('https://jsonplaceholder.typicode.com/posts', timeout=1)
except Exception as e:
response = e
if (time.time() - start_time) - min_func_time < 0:
time.sleep(min_func_time - (time.time() - start_time))
return response
api_call([1]*50)
理想情况下,代码不应超过 1.5 秒,但目前大约需要 12-14 秒。 一旦我删除 QPS 管理器逻辑,脚本就会加速到预期速度。
请务必指出我做错了什么,并且如果已经有任何可用的包可以开箱即用。
看起来 ratelimit 就是这样做的:
from ratelimit import limits, sleep_and_retry
@make_parallel
@sleep_and_retry
@limits(calls=10, period=1)
def api_call(i):
try:
response = requests.get("https://jsonplaceholder.typicode.com/posts", timeout=1)
except Exception as e:
response = e
return response
编辑:我做了一些测试,看起来 @sleep_and_retry
有点太乐观了,所以稍微增加一点,到 1.2 秒:
s = datetime.now()
api_call([1] * 50)
elapsed_time = datetime.now() - s
print(elapsed_time > timedelta(seconds=50 / 10))