python asyncio as_completed 疑惑

python asyncio as_completed doubts

在阅读了很多关于 asyncio 的文章之后(我对此一窍不通),我设法编写了一些简单的程序来执行我希望它们执行的操作。

然而,我对 as_completed 方法有一些疑问:它在内部如何工作以及它如何影响我的 cpu 使用。

所以,有以下片段:

#as_completed_example.py
import asyncio
import tqdm
import datetime
import sys
import signal
import random
#--------------------------------------------------------------------------------------
async def heavy_load(i):
    #tqdm.tqdm.write('#DEBUG    '+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+' '+str(i))
    await asyncio.sleep(random.random())
    return None
#--------------------------------------------------------------------------------------
async def main():
    length  =   int(sys.argv[1])
    inputs  =   list(range(length))
    pbar    =   tqdm.tqdm(total=len(inputs),position=0,leave=True,bar_format='#PROGRESS {desc}: {percentage:.3f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}')
    tasks   =   [heavy_load(i) for i in inputs]
    for future in asyncio.as_completed(tasks):
        _ = await future
        pbar.update(1)
        pbar.refresh()            
#---------------------------------------------------------------------------    
def sigint_handler(signum,frame):
    tqdm.tqdm.write('#INFO '+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+' user aborted execution!')
    sys.exit(0)
#---------------------------------------------------------------------------    
if(__name__=='__main__'):
    signal.signal(signal.SIGINT,sigint_handler)
    asyncio.run(main())     

如果我将其命名为 python3 as_completed_example.py 1000 ,它可以完美运行。然而,如果我将其称为 as_completed_example.py 1000000 (large number),我会发现我的进度条在很长一段时间内都停留在 0%:

-虽然我的进度条是 0%,

--我的 cpu 发生了什么? 因为它需要一个核心才能达到 100% 使用率

--为什么我过了一段时间 as_completed 还没有收到任何 future

这里有几个问题,如果可能的话应该避免在SO上。但是你的前两个问题有一个简单的答案。为什么它会固定一个 CPU,为什么在打印进度条之前会有延迟?有很多 single-threaded 工作要做。

asyncio 运行 一切都在一个线程中,除非你明确地不这样做。您正在构建的任务需要在 asyncio 中进行大量内部设置,尤其是对 as_completed 的调用。您可以查看 at the source 了解详细信息,但它必须:

  • 为你的未来创造一个 set。不算太贵,但也不是免费的。
  • 设置生产者和消费者队列来控制尚未运行的任务和已经完成的任务。对于您正在使用的如此大量的任务,这可能会导致多次大量分配,这可能是一个真正的杀手。
  • 在期货完成时安排回调 运行。这主要是将它们从队列移到另一个队列,并将它们从期货的 set 中移除,其中 none 是免费的。
  • 每个期货收益

事实上这里有很多设置,而且这需要相当多的时间,通过改变输入的大小可以很容易地看出这一点。在我的笔记本电脑上,任何期货 运行 真正从大小 100000 开始跌落悬崖之前的时间。此外,它下降了 non-linearly,这表明这个大小对我机器上的内存层次结构特别不利(例如,在这个大小之后有更多的缓存未命中)。

我还发现 asyncio 事件循环的 解决方案 可能在这里发挥了作用。时间必须随着期货的消耗而流逝。您正在创建许多期货,其中许多期货几乎同时完成(正如@user4815162342 在评论中正确指出的那样),在事件循环的单个滴答声中。事件循环的每个滴答声需要大量工作,而且必须全部在单个线程上完成。

这很清楚,事实上整个事情需要超过 1 秒才能完成。最大睡眠间隔为 1 秒,因为 random.random 为您提供了 [0, 1.0) 的值,但整个应用程序需要更长的时间。因此,这里进行的工作不仅仅是“一秒钟的价值”,而且所有工作都在一个线程中。