使用 python aiohttp 限制每分钟的并发和控制请求?
limit concurreny and control requests per minute with python aiohttp?
有一款名为激战 2 的游戏,它为我们提供了 API 来查询游戏数据库中的几乎所有内容。我的目标是使用 python asyncio 和 aiohttp 编写一个简单的爬虫并从激战 2 游戏数据库中获取所有项目的信息。
我写了一个简短的程序,它可以工作,但它的行为有点奇怪,我想这是我对编写协同程序不理解的地方。
首先,我使用 Postman 应用程序发出了请求。而且,在响应 header 中,有 X-Rate-Limit-Limit,600。所以我猜请求限制在每分钟 600?
这是我的问题。
1、程序结束后。我检查了一些 JSON 文件,它们具有相同的内容
[{"name": "Endless Fractal Challenge Mote Tonic", "description": "Transform into a Challenge Mote for 15 minutes or until hit. You cannot move while transformed."......
这意味着请求得到了错误的响应,但我不知道为什么。
2、我试过asyncio.Semaphore,但是即使我限制并发为5,请求也很快超过600。所以我试图通过在 request_item 函数的末尾添加一个 time.sleep(0.2) 来控制时间。我猜 time.sleep(0.2) 会暂停整个 python 进程 0.2 秒,实际上,它起作用了,但是执行了一段时间后,程序挂了很长时间,然后给出了很多失败的尝试。每次自动重试仍然失败。我对这种行为感到困惑。
async def request_item(session, item_id):
req_param_item = req_param
req_param_item['ids'] = item_id
# retry for 3 times when exception occurs.
for i in range(3):
try:
async with session.get(url_template, params=req_param_item) as response:
result = await response.json()
with open(f'item_info/{item_id}.json', 'w') as f:
json.dump(result, f)
print(item_id, 'done')
break
except Exception as e:
print(item_id, i, 'failed')
continue
time.sleep(0.2)
当我将 time.sleep(0.2) 移动到 request_item 函数内的 for 循环中时,整个程序挂起。我不知道发生了什么。
async def request_item(session, item_id):
req_param_item = req_param
req_param_item['ids'] = item_id
for i in range(3):
try:
time.sleep(0.2)
async with session.get(url_template, params=req_param_item) as response:
result = await response.json()
with open(f'item_info/{item_id}.json', 'w') as f:
json.dump(result, f)
print(item_id, 'done')
break
except Exception as e:
print(item_id, i, 'failed')
continue
谁能稍微解释一下?还有更好的解决方案吗?
我认为有一些解决方案,但我无法测试。例如,获取 loop.time(),并为每 600 个请求暂停整个事件循环。或者,将 600 个请求添加到 task_list 并将它们收集为一个组,完成后,再次 asyncio.run(get_item(req_ids)) 再添加 600 个请求。
这是我的全部代码。
import aiohttp
import asyncio
import httpx
import json
import math
import os
import time
tk = 'xxxxxxxx'
url_template = 'https://api.guildwars2.com/v2/items'
# get items list
req_param = {'access_token': tk}
item_list_resp = httpx.get(url_template, params=req_param)
items = item_list_resp.json()
async def request_item(session, item_id):
req_param_item = req_param
req_param_item['ids'] = item_id
for i in range(3):
try:
async with session.get(url_template, params=req_param_item) as response:
result = await response.json()
with open(f'item_info/{item_id}.json', 'w') as f:
json.dump(result, f)
print(item_id, 'done')
break
except Exception as e:
print(item_id, i, 'failed')
continue
# since the game API limit requests, I think it's ok to suspend program for a while
time.sleep(0.2)
async def get_item(item_ids: list):
task_list = []
async with aiohttp.ClientSession() as session:
for item_id in item_ids:
req = request_item(session, item_id)
task = asyncio.create_task(req)
task_list.append(task)
await asyncio.gather(*task_list)
asyncio.run(get_item(req_ids))
您正在使用 time.sleep()
而不是 await asyncio.sleep()
。堵洞执行N秒,做错地方
事情是这样的。
当你 运行
for item_id in item_ids:
req = request_item(session, item_id)
task = asyncio.create_task(req)
task_list.append(task)
您只需安排您的请求,而不是 运行 安排它。 (例如,你有 1000 item_ids
)所以你安排了 1000 个任务,当你 运行 await asyncio.gather(*task_list)
你实际上等待所有这 1000 个任务将被执行。他们会立刻开火。
但是在每个任务中你 运行 time.sleep(0.2)
并且你必须等待 1000*0.2 秒。一次记住所有任务 运行,并且通常以随机顺序。所以你 运行 任务 1 并等待 0.2 秒,然后启动任务 2 并等待 0.2 秒,然后任务 999 启动并等待 0.2 秒等等。
最简单的解决方案是在发出 600 个请求后等待一分钟。你需要在 get_item
里面减速。示例代码(我没有测试):
async def get_item(item_ids: list):
task_list = []
async with aiohttp.ClientSession() as session:
for n, item_id in enumerate(item_ids):
req = request_item(session, item_id)
task = asyncio.create_task(req)
task_list.append(task)
if n % 600 == 0:
await asyncio.gather(*task_list)
await asyncio.sleep(60)
task_list = []
我推荐你使用一个库 asyncio-throttle。
PS。由于速率限制为每分钟 600,我认为您不需要 asyncio
,因为我非常确定 600 个并发请求将在 5-10 秒内执行。检查两次是否您的 600 请求使用经典 requests
线程超过 1 分钟。
有一款名为激战 2 的游戏,它为我们提供了 API 来查询游戏数据库中的几乎所有内容。我的目标是使用 python asyncio 和 aiohttp 编写一个简单的爬虫并从激战 2 游戏数据库中获取所有项目的信息。
我写了一个简短的程序,它可以工作,但它的行为有点奇怪,我想这是我对编写协同程序不理解的地方。
首先,我使用 Postman 应用程序发出了请求。而且,在响应 header 中,有 X-Rate-Limit-Limit,600。所以我猜请求限制在每分钟 600?
这是我的问题。
1、程序结束后。我检查了一些 JSON 文件,它们具有相同的内容
[{"name": "Endless Fractal Challenge Mote Tonic", "description": "Transform into a Challenge Mote for 15 minutes or until hit. You cannot move while transformed."......
这意味着请求得到了错误的响应,但我不知道为什么。
2、我试过asyncio.Semaphore,但是即使我限制并发为5,请求也很快超过600。所以我试图通过在 request_item 函数的末尾添加一个 time.sleep(0.2) 来控制时间。我猜 time.sleep(0.2) 会暂停整个 python 进程 0.2 秒,实际上,它起作用了,但是执行了一段时间后,程序挂了很长时间,然后给出了很多失败的尝试。每次自动重试仍然失败。我对这种行为感到困惑。
async def request_item(session, item_id):
req_param_item = req_param
req_param_item['ids'] = item_id
# retry for 3 times when exception occurs.
for i in range(3):
try:
async with session.get(url_template, params=req_param_item) as response:
result = await response.json()
with open(f'item_info/{item_id}.json', 'w') as f:
json.dump(result, f)
print(item_id, 'done')
break
except Exception as e:
print(item_id, i, 'failed')
continue
time.sleep(0.2)
当我将 time.sleep(0.2) 移动到 request_item 函数内的 for 循环中时,整个程序挂起。我不知道发生了什么。
async def request_item(session, item_id):
req_param_item = req_param
req_param_item['ids'] = item_id
for i in range(3):
try:
time.sleep(0.2)
async with session.get(url_template, params=req_param_item) as response:
result = await response.json()
with open(f'item_info/{item_id}.json', 'w') as f:
json.dump(result, f)
print(item_id, 'done')
break
except Exception as e:
print(item_id, i, 'failed')
continue
谁能稍微解释一下?还有更好的解决方案吗? 我认为有一些解决方案,但我无法测试。例如,获取 loop.time(),并为每 600 个请求暂停整个事件循环。或者,将 600 个请求添加到 task_list 并将它们收集为一个组,完成后,再次 asyncio.run(get_item(req_ids)) 再添加 600 个请求。
这是我的全部代码。
import aiohttp
import asyncio
import httpx
import json
import math
import os
import time
tk = 'xxxxxxxx'
url_template = 'https://api.guildwars2.com/v2/items'
# get items list
req_param = {'access_token': tk}
item_list_resp = httpx.get(url_template, params=req_param)
items = item_list_resp.json()
async def request_item(session, item_id):
req_param_item = req_param
req_param_item['ids'] = item_id
for i in range(3):
try:
async with session.get(url_template, params=req_param_item) as response:
result = await response.json()
with open(f'item_info/{item_id}.json', 'w') as f:
json.dump(result, f)
print(item_id, 'done')
break
except Exception as e:
print(item_id, i, 'failed')
continue
# since the game API limit requests, I think it's ok to suspend program for a while
time.sleep(0.2)
async def get_item(item_ids: list):
task_list = []
async with aiohttp.ClientSession() as session:
for item_id in item_ids:
req = request_item(session, item_id)
task = asyncio.create_task(req)
task_list.append(task)
await asyncio.gather(*task_list)
asyncio.run(get_item(req_ids))
您正在使用 time.sleep()
而不是 await asyncio.sleep()
。堵洞执行N秒,做错地方
事情是这样的。 当你 运行
for item_id in item_ids:
req = request_item(session, item_id)
task = asyncio.create_task(req)
task_list.append(task)
您只需安排您的请求,而不是 运行 安排它。 (例如,你有 1000 item_ids
)所以你安排了 1000 个任务,当你 运行 await asyncio.gather(*task_list)
你实际上等待所有这 1000 个任务将被执行。他们会立刻开火。
但是在每个任务中你 运行 time.sleep(0.2)
并且你必须等待 1000*0.2 秒。一次记住所有任务 运行,并且通常以随机顺序。所以你 运行 任务 1 并等待 0.2 秒,然后启动任务 2 并等待 0.2 秒,然后任务 999 启动并等待 0.2 秒等等。
最简单的解决方案是在发出 600 个请求后等待一分钟。你需要在 get_item
里面减速。示例代码(我没有测试):
async def get_item(item_ids: list):
task_list = []
async with aiohttp.ClientSession() as session:
for n, item_id in enumerate(item_ids):
req = request_item(session, item_id)
task = asyncio.create_task(req)
task_list.append(task)
if n % 600 == 0:
await asyncio.gather(*task_list)
await asyncio.sleep(60)
task_list = []
我推荐你使用一个库 asyncio-throttle。
PS。由于速率限制为每分钟 600,我认为您不需要 asyncio
,因为我非常确定 600 个并发请求将在 5-10 秒内执行。检查两次是否您的 600 请求使用经典 requests
线程超过 1 分钟。