等待任何未来的异步

await for any future asyncio

我正在尝试使用 asyncio 来处理并发网络 I/O。大量功能将被安排在一个点上,每个功能完成所需的时间差异很大。接收到的数据随后在每个输出的单独进程中进行处理。

处理数据的顺序无关紧要,因此考虑到输出的等待时间可能非常长,我想 await 无论未来先完成什么,而不是预定义的顺序。

def fetch(x):
    sleep()

async def main():
    futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
    for f in futures:
       await f

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

正常情况下,按照期货排队的顺序等待即可:

蓝色表示每个任务在执行器队列中的时间,即 run_in_executor 已被调用,但函数尚未执行,因为执行器同时仅运行 5 个任务;绿色是执行函数本身所花费的时间;红色是等待所有之前的期货到 await.

所花费的时间

在我的情况下,函数随时间变化很大,等待队列中的前一个 futures 会浪费很多时间,而我可以在本地处理 GET 输出。这使我的系统闲置了一段时间,当多个输出同时完成时才变得不知所措,然后跳回闲置等待更多请求完成。

有没有办法 await 执行器中首先完成的任何 future?

您似乎在寻找 asyncio.waitreturn_when=asyncio.FIRST_COMPLETED

def fetch(x):
    sleep()

async def main():
    futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
    while futures:
        done, futures = await asyncio.wait(futures, 
            loop=loop, return_when=asyncio.FIRST_COMPLETED)  
        for f in done:
            await f

loop = asyncio.get_event_loop()
loop.run_until_complete(main())