python asyncio as_completed 疑惑
python asyncio as_completed doubts
在阅读了很多关于 asyncio 的文章之后(我对此一窍不通),我设法编写了一些简单的程序来执行我希望它们执行的操作。
然而,我对 as_completed 方法有一些疑问:它在内部如何工作以及它如何影响我的 cpu 使用。
所以,有以下片段:
#as_completed_example.py
import asyncio
import tqdm
import datetime
import sys
import signal
import random
#--------------------------------------------------------------------------------------
async def heavy_load(i):
#tqdm.tqdm.write('#DEBUG '+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+' '+str(i))
await asyncio.sleep(random.random())
return None
#--------------------------------------------------------------------------------------
async def main():
length = int(sys.argv[1])
inputs = list(range(length))
pbar = tqdm.tqdm(total=len(inputs),position=0,leave=True,bar_format='#PROGRESS {desc}: {percentage:.3f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}')
tasks = [heavy_load(i) for i in inputs]
for future in asyncio.as_completed(tasks):
_ = await future
pbar.update(1)
pbar.refresh()
#---------------------------------------------------------------------------
def sigint_handler(signum,frame):
tqdm.tqdm.write('#INFO '+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+' user aborted execution!')
sys.exit(0)
#---------------------------------------------------------------------------
if(__name__=='__main__'):
signal.signal(signal.SIGINT,sigint_handler)
asyncio.run(main())
如果我将其命名为 python3 as_completed_example.py 1000
,它可以完美运行。然而,如果我将其称为 as_completed_example.py 1000000 (large number)
,我会发现我的进度条在很长一段时间内都停留在 0%:
-虽然我的进度条是 0%,
--我的 cpu 发生了什么? 因为它需要一个核心才能达到 100% 使用率
--为什么我过了一段时间 as_completed 还没有收到任何 future
?
这里有几个问题,如果可能的话应该避免在SO上。但是你的前两个问题有一个简单的答案。为什么它会固定一个 CPU,为什么在打印进度条之前会有延迟?有很多 single-threaded 工作要做。
asyncio
运行 一切都在一个线程中,除非你明确地不这样做。您正在构建的任务需要在 asyncio
中进行大量内部设置,尤其是对 as_completed
的调用。您可以查看 at the source 了解详细信息,但它必须:
- 为你的未来创造一个
set
。不算太贵,但也不是免费的。
- 设置生产者和消费者队列来控制尚未运行的任务和已经完成的任务。对于您正在使用的如此大量的任务,这可能会导致多次大量分配,这可能是一个真正的杀手。
- 在期货完成时安排回调 运行。这主要是将它们从队列移到另一个队列,并将它们从期货的
set
中移除,其中 none 是免费的。
- 每个期货收益
事实上这里有很多设置,而且这需要相当多的时间,通过改变输入的大小可以很容易地看出这一点。在我的笔记本电脑上,任何期货 运行 真正从大小 100000
开始跌落悬崖之前的时间。此外,它下降了 non-linearly,这表明这个大小对我机器上的内存层次结构特别不利(例如,在这个大小之后有更多的缓存未命中)。
我还发现 asyncio
事件循环的 解决方案 可能在这里发挥了作用。时间必须随着期货的消耗而流逝。您正在创建许多期货,其中许多期货几乎同时完成(正如@user4815162342 在评论中正确指出的那样),在事件循环的单个滴答声中。事件循环的每个滴答声需要大量工作,而且必须全部在单个线程上完成。
这很清楚,事实上整个事情需要超过 1 秒才能完成。最大睡眠间隔为 1 秒,因为 random.random
为您提供了 [0, 1.0)
的值,但整个应用程序需要更长的时间。因此,这里进行的工作不仅仅是“一秒钟的价值”,而且所有工作都在一个线程中。
在阅读了很多关于 asyncio 的文章之后(我对此一窍不通),我设法编写了一些简单的程序来执行我希望它们执行的操作。
然而,我对 as_completed 方法有一些疑问:它在内部如何工作以及它如何影响我的 cpu 使用。
所以,有以下片段:
#as_completed_example.py
import asyncio
import tqdm
import datetime
import sys
import signal
import random
#--------------------------------------------------------------------------------------
async def heavy_load(i):
#tqdm.tqdm.write('#DEBUG '+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+' '+str(i))
await asyncio.sleep(random.random())
return None
#--------------------------------------------------------------------------------------
async def main():
length = int(sys.argv[1])
inputs = list(range(length))
pbar = tqdm.tqdm(total=len(inputs),position=0,leave=True,bar_format='#PROGRESS {desc}: {percentage:.3f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}')
tasks = [heavy_load(i) for i in inputs]
for future in asyncio.as_completed(tasks):
_ = await future
pbar.update(1)
pbar.refresh()
#---------------------------------------------------------------------------
def sigint_handler(signum,frame):
tqdm.tqdm.write('#INFO '+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+' user aborted execution!')
sys.exit(0)
#---------------------------------------------------------------------------
if(__name__=='__main__'):
signal.signal(signal.SIGINT,sigint_handler)
asyncio.run(main())
如果我将其命名为 python3 as_completed_example.py 1000
,它可以完美运行。然而,如果我将其称为 as_completed_example.py 1000000 (large number)
,我会发现我的进度条在很长一段时间内都停留在 0%:
-虽然我的进度条是 0%,
--我的 cpu 发生了什么? 因为它需要一个核心才能达到 100% 使用率
--为什么我过了一段时间 as_completed 还没有收到任何 future
?
这里有几个问题,如果可能的话应该避免在SO上。但是你的前两个问题有一个简单的答案。为什么它会固定一个 CPU,为什么在打印进度条之前会有延迟?有很多 single-threaded 工作要做。
asyncio
运行 一切都在一个线程中,除非你明确地不这样做。您正在构建的任务需要在 asyncio
中进行大量内部设置,尤其是对 as_completed
的调用。您可以查看 at the source 了解详细信息,但它必须:
- 为你的未来创造一个
set
。不算太贵,但也不是免费的。 - 设置生产者和消费者队列来控制尚未运行的任务和已经完成的任务。对于您正在使用的如此大量的任务,这可能会导致多次大量分配,这可能是一个真正的杀手。
- 在期货完成时安排回调 运行。这主要是将它们从队列移到另一个队列,并将它们从期货的
set
中移除,其中 none 是免费的。 - 每个期货收益
事实上这里有很多设置,而且这需要相当多的时间,通过改变输入的大小可以很容易地看出这一点。在我的笔记本电脑上,任何期货 运行 真正从大小 100000
开始跌落悬崖之前的时间。此外,它下降了 non-linearly,这表明这个大小对我机器上的内存层次结构特别不利(例如,在这个大小之后有更多的缓存未命中)。
我还发现 asyncio
事件循环的 解决方案 可能在这里发挥了作用。时间必须随着期货的消耗而流逝。您正在创建许多期货,其中许多期货几乎同时完成(正如@user4815162342 在评论中正确指出的那样),在事件循环的单个滴答声中。事件循环的每个滴答声需要大量工作,而且必须全部在单个线程上完成。
这很清楚,事实上整个事情需要超过 1 秒才能完成。最大睡眠间隔为 1 秒,因为 random.random
为您提供了 [0, 1.0)
的值,但整个应用程序需要更长的时间。因此,这里进行的工作不仅仅是“一秒钟的价值”,而且所有工作都在一个线程中。