将 tqdm 用于进度条时多处理的 join() 输出

join() output from multiprocessing when using tqdm for progress bar

我正在使用类似于 的构造来 运行 我的处理与 tqdm...

提供的进度条并行
from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for _ in p.imap_unordered(_foo, range(0, max_)):
                pbar.update()
    results = p.join()  ## My attempt to combine results

results 虽然总是 NoneType,但我不知道如何合并我的结果。我知道 with ...: 会在完成时自动关闭正在处理的内容。

我试过去掉外层with:

if __name__ == '__main__':
    max_ = 10
    p = Pool(processes=8)
    with tqdm(total=max_) as pbar:
        for _ in p.imap_unordered(_foo, range(0, max_)):
            pbar.update()
    p.close()
    results = p.join()
    print(f"Results : {results}")

对如何 join() 我的结果感到困惑?

您对 p.join() 的调用只是等待所有池进程结束并且 returns None。这个调用实际上是不必要的,因为您将池用作上下文管理器,也就是说您已经指定 with Pool(processes=2) as p:)。当该块终止时,将对 p.terminate() 进行隐式调用,这会立即终止池进程和任何可能正在 运行 或排队到 运行 的任务(有 none 在你的情况下)。

实际上,通过调用 return 迭代 迭代器 return 每个 return 来自你的辅助函数的值,_foo。但是由于您使用的是 imap_unordered 方法,因此 returned 的结果可能不符合提交顺序。换句话说,您不能假设 return 值将连续为 0、1、4、9 等。有很多方法可以处理这个问题,例如让您的 worker 函数 return原始参数以及平方值:

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   return my_number, square # return the argunent along with the result

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        results = [None] * 30; # preallocate the resulys array
        with tqdm(total=max_) as pbar:
            for x, result in p.imap_unordered(_foo, range(0, max_)):
                results[x] = result
                pbar.update()
        print(results)

第二种方法是不使用 imap_unordered,而是 apply_async 和回调函数。这样做的缺点是,对于大型可迭代对象,您无法像使用 imap_unordered:

那样指定 chunksize 参数
from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   return square

if __name__ == '__main__':
    def my_callback(_): # ignore result
        pbar.update() # update progress bar when a result is produced

    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            async_results = [p.apply_async(_foo, (x,), callback=my_callback) for x in range(0, max_)]
            # wait for all tasks to complete:
            p.close()
            p.join()
            results = [async_result.get() for async_result in async_results]
        print(results)