将 tqdm 用于进度条时多处理的 join() 输出
join() output from multiprocessing when using tqdm for progress bar
我正在使用类似于 的构造来 运行 我的处理与 tqdm
...
提供的进度条并行
from multiprocessing import Pool
import time
from tqdm import *
def _foo(my_number):
square = my_number * my_number
return square
if __name__ == '__main__':
with Pool(processes=2) as p:
max_ = 30
with tqdm(total=max_) as pbar:
for _ in p.imap_unordered(_foo, range(0, max_)):
pbar.update()
results = p.join() ## My attempt to combine results
results
虽然总是 NoneType
,但我不知道如何合并我的结果。我知道 with ...:
会在完成时自动关闭正在处理的内容。
我试过去掉外层with:
if __name__ == '__main__':
max_ = 10
p = Pool(processes=8)
with tqdm(total=max_) as pbar:
for _ in p.imap_unordered(_foo, range(0, max_)):
pbar.update()
p.close()
results = p.join()
print(f"Results : {results}")
对如何 join()
我的结果感到困惑?
您对 p.join()
的调用只是等待所有池进程结束并且 returns None
。这个调用实际上是不必要的,因为您将池用作上下文管理器,也就是说您已经指定 with Pool(processes=2) as p:
)。当该块终止时,将对 p.terminate()
进行隐式调用,这会立即终止池进程和任何可能正在 运行 或排队到 运行 的任务(有 none 在你的情况下)。
实际上,通过调用 return 迭代 迭代器 return 每个 return 来自你的辅助函数的值,_foo
。但是由于您使用的是 imap_unordered
方法,因此 returned 的结果可能不符合提交顺序。换句话说,您不能假设 return 值将连续为 0、1、4、9 等。有很多方法可以处理这个问题,例如让您的 worker 函数 return原始参数以及平方值:
from multiprocessing import Pool
import time
from tqdm import *
def _foo(my_number):
square = my_number * my_number
return my_number, square # return the argunent along with the result
if __name__ == '__main__':
with Pool(processes=2) as p:
max_ = 30
results = [None] * 30; # preallocate the resulys array
with tqdm(total=max_) as pbar:
for x, result in p.imap_unordered(_foo, range(0, max_)):
results[x] = result
pbar.update()
print(results)
第二种方法是不使用 imap_unordered
,而是 apply_async
和回调函数。这样做的缺点是,对于大型可迭代对象,您无法像使用 imap_unordered
:
那样指定 chunksize 参数
from multiprocessing import Pool
import time
from tqdm import *
def _foo(my_number):
square = my_number * my_number
return square
if __name__ == '__main__':
def my_callback(_): # ignore result
pbar.update() # update progress bar when a result is produced
with Pool(processes=2) as p:
max_ = 30
with tqdm(total=max_) as pbar:
async_results = [p.apply_async(_foo, (x,), callback=my_callback) for x in range(0, max_)]
# wait for all tasks to complete:
p.close()
p.join()
results = [async_result.get() for async_result in async_results]
print(results)
我正在使用类似于 tqdm
...
from multiprocessing import Pool
import time
from tqdm import *
def _foo(my_number):
square = my_number * my_number
return square
if __name__ == '__main__':
with Pool(processes=2) as p:
max_ = 30
with tqdm(total=max_) as pbar:
for _ in p.imap_unordered(_foo, range(0, max_)):
pbar.update()
results = p.join() ## My attempt to combine results
results
虽然总是 NoneType
,但我不知道如何合并我的结果。我知道 with ...:
会在完成时自动关闭正在处理的内容。
我试过去掉外层with:
if __name__ == '__main__':
max_ = 10
p = Pool(processes=8)
with tqdm(total=max_) as pbar:
for _ in p.imap_unordered(_foo, range(0, max_)):
pbar.update()
p.close()
results = p.join()
print(f"Results : {results}")
对如何 join()
我的结果感到困惑?
您对 p.join()
的调用只是等待所有池进程结束并且 returns None
。这个调用实际上是不必要的,因为您将池用作上下文管理器,也就是说您已经指定 with Pool(processes=2) as p:
)。当该块终止时,将对 p.terminate()
进行隐式调用,这会立即终止池进程和任何可能正在 运行 或排队到 运行 的任务(有 none 在你的情况下)。
实际上,通过调用 return 迭代 迭代器 return 每个 return 来自你的辅助函数的值,_foo
。但是由于您使用的是 imap_unordered
方法,因此 returned 的结果可能不符合提交顺序。换句话说,您不能假设 return 值将连续为 0、1、4、9 等。有很多方法可以处理这个问题,例如让您的 worker 函数 return原始参数以及平方值:
from multiprocessing import Pool
import time
from tqdm import *
def _foo(my_number):
square = my_number * my_number
return my_number, square # return the argunent along with the result
if __name__ == '__main__':
with Pool(processes=2) as p:
max_ = 30
results = [None] * 30; # preallocate the resulys array
with tqdm(total=max_) as pbar:
for x, result in p.imap_unordered(_foo, range(0, max_)):
results[x] = result
pbar.update()
print(results)
第二种方法是不使用 imap_unordered
,而是 apply_async
和回调函数。这样做的缺点是,对于大型可迭代对象,您无法像使用 imap_unordered
:
from multiprocessing import Pool
import time
from tqdm import *
def _foo(my_number):
square = my_number * my_number
return square
if __name__ == '__main__':
def my_callback(_): # ignore result
pbar.update() # update progress bar when a result is produced
with Pool(processes=2) as p:
max_ = 30
with tqdm(total=max_) as pbar:
async_results = [p.apply_async(_foo, (x,), callback=my_callback) for x in range(0, max_)]
# wait for all tasks to complete:
p.close()
p.join()
results = [async_result.get() for async_result in async_results]
print(results)