多进程池 apply_async 中的问题

A problem in apply_async in multiprocess pool

我在 Python 中使用多处理池及其 .apply_async() 方法来 运行 多个工作线程并发。

但是由于使用 with 而不是创建任意实例,因此存在问题。

这是我到目前为止所做的:


公共部分代码片段:

from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
from time import sleep, time

def worker(x):
    print(f"{x} started.")
    sleep(x)
    print(f"{x} finished.")
    return f"{x} finished."

result_list = []
def log_result(result):
    result_list.append(result)

通过 Python 2 方式运行良好的第一个代码片段:

tick = time()

pool = Pool()
for i in range(6):
    pool.apply_async(worker, args=(i, ), callback=log_result)
pool.close()
pool.join()

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

输出:

1 started.
2 started.
0 started.
0 finished.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time:  6.022687673568726
['0 finished.', '1 finished.', '2 finished.', '3 finished.', '4 finished.', '5 finished.']
5

通过 Python 3 方式运行良好的第二个代码片段:

tick = time()

with ProcessPoolExecutor() as executor:
    for i in range(6):
        executor.submit(worker, i)

print('Total elapsed time: ', time() - tick)
print(i)  # Indicates that all iteration has been done.

输出:

0 started.
0 finished.
1 started.
2 started.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time:  6.017550945281982
5

额外:


问题:

现在问题来了,我想实现Python 2方式,使用with例如Python 3方法,但任务未完成:

tick = time()

with Pool() as pool:
    for i in range(6):
        pool.apply_async(worker, args=(i,), callback=log_result)

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

输出:

Total elapsed time:  0.10628008842468262
[]
5

但是,如果我在 pool.apply_async(...) 之后放置一个 sleep(1),一些精简任务将完成(建立一个块):

tick = time()

with Pool() as pool:
    for i in range(6):
        pool.apply_async(worker, args=(i,), callback=log_result)
        sleep(1)

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

输出:

0 started.
0 finished.
1 started.
2 started.
1 finished.
3 started.
4 started.
2 finished.
5 started.
3 finished.
Total elapsed time:  6.022568702697754
['0 finished.', '1 finished.', '2 finished.', '3 finished.']
5

我错过了什么?

concurrent.futures.Executormultiprocessing.Pool 有两个完全不同的 context manager 实现。

concurrent.futures.Executor 调用 shutdown(wait=True) 有效地等待所有排队的作业按照 documentation 完成。

You can avoid having to call this method explicitly if you use the with statement, which will shutdown the Executor (waiting as if Executor.shutdown() were called with wait set to True)

multiprocessing.Pool 调用 terminate 而不是 close,然后调用 join,这会导致所有正在进行的作业过早中断。在 documentation.

Pool objects now support the context management protocol – see Context Manager Types. enter() returns the pool object, and exit() calls terminate().

如果要将multiprocessing.Pool与其上下文管理器一起使用,您需要自己等待结果。

with Pool() as pool:
    async_result = pool.apply_async(worker, args=(i,), callback=log_result)
    async_result.wait()