Python Aiohttp Asyncio:如何在每个任务之间创建延迟

Python Aiohttp Asyncio: how to create delays between each task

我要解决的问题: 我正在向服务器发出许多 api 请求。我正在尝试在异步 api 调用之间创建延迟以遵守服务器的速率限制策略。

我想让它做什么 我希望它表现得像这样:

  1. 发出 api 请求 #1
  2. 等待 0.1 秒
  3. 发出 api 请求 #2
  4. 等待0.1秒 ...等等...
  5. 重复直到完成所有请求
  6. 将响应和return结果收集到一个对象中(结果)

问题: 当我在代码中引入 asyncio.sleep()time.sleep() 时,它仍然使 api 几乎是瞬间请求。它似乎延迟了 print() 的执行,但没有延迟 api 请求。我怀疑我必须在 loop 中创建延迟,而不是在 fetch_one() 或 fetch_all() 中创建延迟,但无法弄清楚如何这样做。

代码块:

async def fetch_all(loop, urls, delay): 
    results = await asyncio.gather(*[fetch_one(loop, url, delay) for url in urls], return_exceptions=True)
    return results

async def fetch_one(loop, url, delay):

    #time.sleep(delay)
    #asyncio.sleep(delay)

    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get(url, ssl=SSLContext()) as resp:
            # print("An api call to ", url, " is made at ", time.time())
            # print(resp)
            return await resp

delay = 0.1
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))

Versions I'm using: 
python                    3.8.5
aiohttp                   3.7.4
asyncio                   3.4.3

如果有任何指导我正确方向的提示,我将不胜感激!

当您使用 asyncio.gather 时,您会同时 运行 所有 fetch_one 协程。他们所有人一起等待 delay,而不是一起即时调用 API。

要解决这个问题,您应该在 fetch_all 中一个接一个地等待 fetch_one 或使用 Semaphore 来指示下一个不应该在上一个完成之前开始。

想法如下:

import asyncio

_sem = asyncio.Semaphore(1)


async def fetch_all(loop, urls, delay): 
    results = await asyncio.gather(*[fetch_one(loop, url, delay) for url in urls], return_exceptions=True)
    return results

async def fetch_one(loop, url, delay):

    async with _sem:  # next coroutine(s) will stuck here until the previous is done
        await asyncio.sleep(delay)

        async with aiohttp.ClientSession(loop=loop) as session:
            async with session.get(url, ssl=SSLContext()) as resp:
                # print("An api call to ", url, " is made at ", time.time())
                # print(resp)
                return await resp

delay = 0.1
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))

asyncio.gather 的调用将“同时”启动所有请求 - 另一方面,如果您只是对每个任务使用锁或等待,您将不会从使用并行性中获得任何好处.

如果您知道您可以发出请求的速率,那么最简单的做法就是在每个连续请求之前增加异步暂停 - 一个简单的全局变量可以做到这一点:


next_delay = 0.1

async def fetch_all(loop, urls, delay): 
    results = await asyncio.gather(*[fetch_one(loop, url, delay) for url in urls], return_exceptions=True)
    return results

async def fetch_one(loop, url, delay):
    global next_delay
    
    next_delay += delay
    await asyncio.sleep(next_delay)

    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get(url, ssl=SSLContext()) as resp:
            # print("An api call to ", url, " is made at ", time.time())
            # print(resp)
            return await resp

delay = 0.1
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))

现在,如果你想发出 5 个请求,然后发出下 5 个请求,你可以使用像 asyncio.Condition 这样的同步原语,在表达式上使用它的 wait_for 检查有多少 api 呼叫处于活动状态:

active_calls = 0

MAX_CALLS = 5

async def fetch_all(loop, urls, delay): 
    event = asyncio.Event()
    event.set()
    results = await asyncio.gather(*[fetch_one(loop, url, delay, event) for url in urls], return_exceptions=True)
    return results

async def fetch_one(loop, url, delay, cond):
    global active_calls
    
    active_calls += 1
    if active_calls > MAX_CALLS:
        event.clear()
        
    await event.wait()
    
    try:
        async with aiohttp.ClientSession(loop=loop) as session:
            async with session.get(url, ssl=SSLContext()) as resp:
                # print("An api call to ", url, " is made at ", time.time())
                # print(resp)
                return await resp
    finally:
        active_calls -= 1
    if active_calls == 0:
        event.set()
        

urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))

对于这两个示例,如果您的任务在设计中避免使用全局变量(实际上,这些是“模块”变量)- 您可以将所有功能移动到 class,然后在一个实例上工作,并且将全局变量提升为实例属性,或使用可变容器,例如在其第一项中保存 active_calls 值的列表,并将其作为参数传递。