并行调用 API,每分钟有硬限制
parallelize calls to an API with hard limit per minute
我正在尝试对 API 进行并行化调用。 API 在停止之前每分钟有 1,200 次调用的限制。在低于限制的情况下最有效的异步方法是什么?
def remove_html_tags(text):
"""Remove html tags from a string"""
import re
clean = re.compile('<.*?>')
return re.sub(clean, ' ', text)
async def getRez(df, url):
async with aiohttp.ClientSession() as session:
auth = aiohttp.BasicAuth('username',pwd)
r = await session.get(url, auth=auth)
if r.status == 200:
content = await r.text()
text = remove_html_tags(str(content))
else:
text = '500 Server Error'
df.loc[df['url'] == url, ['RezText']] = [[text]]
df['wordCount'] = df['RezText'].apply(lambda x: len(str(x).split(" ")))
data = df[df["RezText"] != "500 Server Error"]
async def main(df):
df['RezText'] = None
await asyncio.gather(*[getRez(df, url) for url in df['url']])
loop = asyncio.get_event_loop()
loop.run_until_complete(main(data))
每分钟 1200 次调用相当于每秒 20 次调用,因此您可以将请求分成 批次 的 20,并在批次之间休眠一秒钟。
另一种选择是对客户端会话使用 aiohttp.TCPConnector(limit=20)
,但这只会限制 并发请求的数量 ,因此您最终可能会发出更多请求(如果 API 响应快于一秒)或更少的请求(如果 API 响应慢于一秒);请参阅 相关问题。
批处理示例:
# python 3.7+
import aiohttp
import asyncio
async def fetch(session, url):
data = None
async with session.get(url) as response:
if response.status != 200:
text = await response.text()
print("cannot retrieve %s: status: %d, reason: %s" % (url, response.status, text))
else :
data = await response.json()
return data
async def main(n):
print("starting")
session = aiohttp.ClientSession()
tasks = []
batch = []
for i in range(n):
batch.append("http://httpbin.org/anything?key=a%d" % i)
if len(batch) >= 20:
print("issuing batch %d:%d" % (i-20+1, i+1))
for url in batch:
task = asyncio.create_task(fetch(session, url))
tasks.append(task)
batch = []
await asyncio.sleep(1)
if batch: # if batch length does not divide n evenly consume last batch
print("issuing last batch %d:%d" % (n-len(batch), n))
for url in batch:
task = asyncio.create_task(fetch(session, url))
tasks.append(fetch(session, url))
responses = await asyncio.gather(*tasks, return_exceptions=True)
await session.close()
for response in responses:
assert "args" in response
# note that the responses will be in the order in which the requests were made
print("finished")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(111))
输出
starting
issuing batch 0:20
issuing batch 20:40
issuing batch 40:60
issuing batch 60:80
issuing batch 80:100
issuing last batch 100:111
finished
这里的重要位是 asyncio.create_task
(创建一个任务并启动它,返回一个 Task 对象),await asyncio.sleep(1)
(用于限制请求)和 await asyncio.gather
(等待所有要完成的任务 运行).
对于 Python < 3.7,您可以使用 asyncio.ensure_future
而不是 asyncio.create_task
。
我正在尝试对 API 进行并行化调用。 API 在停止之前每分钟有 1,200 次调用的限制。在低于限制的情况下最有效的异步方法是什么?
def remove_html_tags(text):
"""Remove html tags from a string"""
import re
clean = re.compile('<.*?>')
return re.sub(clean, ' ', text)
async def getRez(df, url):
async with aiohttp.ClientSession() as session:
auth = aiohttp.BasicAuth('username',pwd)
r = await session.get(url, auth=auth)
if r.status == 200:
content = await r.text()
text = remove_html_tags(str(content))
else:
text = '500 Server Error'
df.loc[df['url'] == url, ['RezText']] = [[text]]
df['wordCount'] = df['RezText'].apply(lambda x: len(str(x).split(" ")))
data = df[df["RezText"] != "500 Server Error"]
async def main(df):
df['RezText'] = None
await asyncio.gather(*[getRez(df, url) for url in df['url']])
loop = asyncio.get_event_loop()
loop.run_until_complete(main(data))
1200 次调用相当于每秒 20 次调用,因此您可以将请求分成 批次 的 20,并在批次之间休眠一秒钟。
另一种选择是对客户端会话使用 aiohttp.TCPConnector(limit=20)
,但这只会限制 并发请求的数量 ,因此您最终可能会发出更多请求(如果 API 响应快于一秒)或更少的请求(如果 API 响应慢于一秒);请参阅
批处理示例:
# python 3.7+
import aiohttp
import asyncio
async def fetch(session, url):
data = None
async with session.get(url) as response:
if response.status != 200:
text = await response.text()
print("cannot retrieve %s: status: %d, reason: %s" % (url, response.status, text))
else :
data = await response.json()
return data
async def main(n):
print("starting")
session = aiohttp.ClientSession()
tasks = []
batch = []
for i in range(n):
batch.append("http://httpbin.org/anything?key=a%d" % i)
if len(batch) >= 20:
print("issuing batch %d:%d" % (i-20+1, i+1))
for url in batch:
task = asyncio.create_task(fetch(session, url))
tasks.append(task)
batch = []
await asyncio.sleep(1)
if batch: # if batch length does not divide n evenly consume last batch
print("issuing last batch %d:%d" % (n-len(batch), n))
for url in batch:
task = asyncio.create_task(fetch(session, url))
tasks.append(fetch(session, url))
responses = await asyncio.gather(*tasks, return_exceptions=True)
await session.close()
for response in responses:
assert "args" in response
# note that the responses will be in the order in which the requests were made
print("finished")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(111))
输出
starting
issuing batch 0:20
issuing batch 20:40
issuing batch 40:60
issuing batch 60:80
issuing batch 80:100
issuing last batch 100:111
finished
这里的重要位是 asyncio.create_task
(创建一个任务并启动它,返回一个 Task 对象),await asyncio.sleep(1)
(用于限制请求)和 await asyncio.gather
(等待所有要完成的任务 运行).
对于 Python < 3.7,您可以使用 asyncio.ensure_future
而不是 asyncio.create_task
。