map_async 和进度条的多处理

multiprocessing with map_async and progress bar

是否可以使用 map_asyncmultiprocessing 获得进度条:

玩具示例:

from multiprocessing import Pool
import tqdm

def f(x):
    print(x)
    return x*x

n_job = 4
with Pool(processes=n_job) as pool:
    results = pool.map_async(f, range(10)).get()
print(results)

像这样:

data = []

with Pool(processes=10) as pool:
        for d in tqdm.tqdm(
                pool.imap(f, range(10)),
                total=10):
            data.append(d)

我想就是这样了:

from multiprocessing import Pool
import tqdm

def f(x):
    return x*x

n_job = 4

data = []
with Pool(processes=10) as pool:
        for d in tqdm.tqdm(
                pool.map_async(f, range(10)).get(),
                total=10):
            data.append(d)
print(data)

我能想到有几种方法可以实现您想要的:

  1. 将 apply_async 与 回调 参数结合使用,以便在每个结果可用时更新进度条。
  2. 使用 imap 并在迭代结果时更新进度条。

imap有一个轻微的问题,因为结果必须按task-submission顺序返回,这当然是你想要的。但是该顺序不一定反映提交任务的完成顺序,因此进度条不一定像其他情况下那样频繁更新。但我将首先展示该解决方案,因为它是最简单且可能足够的:

from multiprocessing import Pool
import tqdm

def f(x):
    import time
    time.sleep(1) # for demo purposes
    return x*x

# Required by Windows:
if __name__ == '__main__':
    pool_size = 4
    results = []
    with Pool(processes=pool_size) as pool:
        with tqdm.tqdm(total=10) as pbar:
            for result in pool.imap(f, range(10)):
                results.append(result)
                pbar.update()
    print(results)

使用apply_async的解决方案:

from multiprocessing import Pool
import tqdm

def f(x):
    import time
    time.sleep(1) # for demo purposes
    return x*x

# Required by Windows:
if __name__ == '__main__':
    def my_callback(_):
        # We don't care about the actual result.
        # Just update the progress bar:
        pbar.update()

    pool_size = 4
    with Pool(processes=pool_size) as pool:
        with tqdm.tqdm(total=10) as pbar:
            async_results = [pool.apply_async(f, args=(x,), callback=my_callback) for x in range(10)]
            results = [async_result.get() for async_result in async_results]
    print(results)