使用 python aiohttp 限制每分钟的并发和控制请求?

limit concurreny and control requests per minute with python aiohttp?

有一款名为激战 2 的游戏,它为我们提供了 API 来查询游戏数据库中的几乎所有内容。我的目标是使用 python asyncio 和 aiohttp 编写一个简单的爬虫并从激战 2 游戏数据库中获取所有项目的信息。

我写了一个简短的程序,它可以工作,但它的行为有点奇怪,我想这是我对编写协同程序不理解的地方。

首先,我使用 Postman 应用程序发出了请求。而且,在响应 header 中,有 X-Rate-Limit-Limit,600。所以我猜请求限制在每分钟 600?

这是我的问题。

1、程序结束后。我检查了一些 JSON 文件,它们具有相同的内容

[{"name": "Endless Fractal Challenge Mote Tonic", "description": "Transform into a Challenge Mote for 15 minutes or until hit. You cannot move while transformed."......

这意味着请求得到了错误的响应,但我不知道为什么。

2、我试过asyncio.Semaphore,但是即使我限制并发为5,请求也很快超过600。所以我试图通过在 request_item 函数的末尾添加一个 time.sleep(0.2) 来控制时间。我猜 time.sleep(0.2) 会暂停整个 python 进程 0.2 秒,实际上,它起作用了,但是执行了一段时间后,程序挂了很长时间,然后给出了很多失败的尝试。每次自动重试仍然失败。我对这种行为感到困惑。

async def request_item(session, item_id):
    req_param_item = req_param
    req_param_item['ids'] = item_id
    # retry for 3 times when exception occurs.
    for i in range(3):
        try:
            async with session.get(url_template, params=req_param_item) as response:
                result = await response.json()
                with open(f'item_info/{item_id}.json', 'w') as f:
                    json.dump(result, f)
                print(item_id, 'done')
            break
        except Exception as e:
            print(item_id, i, 'failed')
            continue
    time.sleep(0.2)

当我将 time.sleep(0.2) 移动到 request_item 函数内的 for 循环中时,整个程序挂起。我不知道发生了什么。

async def request_item(session, item_id):
    req_param_item = req_param
    req_param_item['ids'] = item_id
    for i in range(3):
        try:
            time.sleep(0.2)
            async with session.get(url_template, params=req_param_item) as response:
                result = await response.json()
                with open(f'item_info/{item_id}.json', 'w') as f:
                    json.dump(result, f)
                print(item_id, 'done')
            break
        except Exception as e:
            print(item_id, i, 'failed')
            continue

谁能稍微解释一下?还有更好的解决方案吗? 我认为有一些解决方案,但我无法测试。例如,获取 loop.time(),并为每 600 个请求暂停整个事件循环。或者,将 600 个请求添加到 task_list 并将它们收集为一个组,完成后,再次 asyncio.run(get_item(req_ids)) 再添加 600 个请求。

这是我的全部代码。

import aiohttp
import asyncio
import httpx
import json
import math
import os
import time

tk = 'xxxxxxxx'
url_template = 'https://api.guildwars2.com/v2/items'

# get items list
req_param = {'access_token': tk}
item_list_resp = httpx.get(url_template, params=req_param)
items = item_list_resp.json()

async def request_item(session, item_id):
    req_param_item = req_param
    req_param_item['ids'] = item_id
    for i in range(3):
        try:
            async with session.get(url_template, params=req_param_item) as response:
                result = await response.json()
                with open(f'item_info/{item_id}.json', 'w') as f:
                    json.dump(result, f)
                print(item_id, 'done')
            break
        except Exception as e:
            print(item_id, i, 'failed')
            continue
    # since the game API limit requests, I think it's ok to suspend program for a while
    time.sleep(0.2)

async def get_item(item_ids: list):
    task_list = []
    async with aiohttp.ClientSession() as session:
        for item_id in item_ids:
            req = request_item(session, item_id)
            task = asyncio.create_task(req)
            task_list.append(task) 
        await asyncio.gather(*task_list)

asyncio.run(get_item(req_ids))

您正在使用 time.sleep() 而不是 await asyncio.sleep()。堵洞执行N秒,做错地方

事情是这样的。 当你 运行

for item_id in item_ids:
   req = request_item(session, item_id)
   task = asyncio.create_task(req)
   task_list.append(task)

您只需安排您的请求,而不是 运行 安排它。 (例如,你有 1000 item_ids)所以你安排了 1000 个任务,当你 运行 await asyncio.gather(*task_list) 你实际上等待所有这 1000 个任务将被执行。他们会立刻开火。

但是在每个任务中你 运行 time.sleep(0.2) 并且你必须等待 1000*0.2 秒。一次记住所有任务 运行,并且通常以随机顺序。所以你 运行 任务 1 并等待 0.2 秒,然后启动任务 2 并等待 0.2 秒,然后任务 999 启动并等待 0.2 秒等等。

最简单的解决方案是在发出 600 个请求后等待一分钟。你需要在 get_item 里面减速。示例代码(我没有测试):

async def get_item(item_ids: list):
    task_list = []
    async with aiohttp.ClientSession() as session:
        for n, item_id in enumerate(item_ids):
            req = request_item(session, item_id)
            task = asyncio.create_task(req)
            task_list.append(task)
            if n % 600 == 0:
                await asyncio.gather(*task_list)
                await asyncio.sleep(60)
                task_list = []

我推荐你使用一个库 asyncio-throttle

PS。由于速率限制为每分钟 600,我认为您不需要 asyncio,因为我非常确定 600 个并发请求将在 5-10 秒内执行。检查两次是否您的 600 请求使用经典 requests 线程超过 1 分钟。