为什么我不能加入由 ProcessPoolExecutor 或 Pool 创建的子进程?

Why can't I join a child process created by ProcessPoolExecutor or Pool?

我想从 multiprocessing.Pool 或 ProcessPoolExecutor 手动加入和关闭子进程。但是,每当我尝试加入由这些池中的任何一个创建的子进程时,代码就会无限期挂起。

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED
from multiprocessing import current_process, active_children

class Example:
    def start(self):
        with ProcessPoolExecutor(max_workers=1) as executor:
            futures = set()
            futures.add(executor.submit(self.worker))
            done, futures = wait(futures, return_when=FIRST_COMPLETED)
            for task in done:
                res = task.result()
                print(f"Child pid is {res}")
                for child in  active_children():
                    if child.pid == res:
                        print("Child found")
                        child.join()
                        child.close()
                print("Child closed")


    def worker(self):
        print("Worker called")
        return current_process().pid

ex = Example()
ex.start()
from multiprocessing import current_process, active_children, Pool

class Example:
    def start(self):
        with Pool(processes=1) as pool:
            task = pool.apply_async(self.worker)

            res = task.get()
            print(f"Child pid is {res}")
            for child in active_children():
                if child.pid == res:
                    print("Child found")
                    child.join()
                    child.close()
            print("Child closed")


    def worker(self):
        print("Worker called")
        return current_process().pid

ex = Example()
ex.start()

输出:

Worker called
Child pid is 284108
Child found

为什么?

多处理池由一个或多个进程组成,这些进程从输入队列中获取提交的“任务”并 运行s 任务直到完成,然后返回以从中获取下一个任务队列。这些过程 运行 直到您通过下面描述的方法之一隐式或显式终止整个池。但重要的一点是,当您向池提交任务时(例如使用 concurrent.futures.ProcessPoolExecutor.submitmultiprocessing.pool.Pool.apply_async),您传递给这些调用的指定工作函数已在 运行 中执行进程,并且该进程在终止之前无法加入,并且除非您采取特定操作来终止它,否则不会终止。

但是没有理由在处理池中的单个进程上显式调用 join 并且它会导致您阻塞 因为这些进程直到concurrent.futures.ProcessPoolExecutor 池你要么先调用 shutdown(wait=True) 并且所有提交的任务都已完成,要么你调用 shutdown(wait=False) 或者如果你正在处理 multiprocess.pool.Pool 池,你首先调用 terminate 或序列 pool.close() 后跟 pool.join()(加入 all 池进程,当所有提交的任务完成时将终止)。但那时不再有任何 运行ning 池进程。例如,如果我们调用 pool.terminate():

from multiprocessing import current_process, active_children, Pool

class Example:
    def start(self):
        pool = Pool(processes=1)
        task = pool.apply_async(self.worker)
        res = task.get()
        pool.terminate() # Now there are no more running processes:
        print(f"Child pid is {res}")
        # This will not find any active children:
        for child in active_children():
            if child.pid == res:
                print("Child found")
                child.join()
                child.close()
                print("Child closed")


    def worker(self):
        print("Worker called")
        return current_process().pid

# Required for Windows:
if __name__ == '__main__':
    ex = Example()
    ex.start()

打印:

Worker called
Child pid is 18076

或者如果我们等待所有任务完成并且池进程自行终止:

from multiprocessing import current_process, active_children, Pool

class Example:
    def start(self):
        pool = Pool(processes=1)
        task = pool.apply_async(self.worker)
        res = task.get()
        # wait for all tasks to complete:
        pool.close()
        pool.join()
        print(f"Child pid is {res}")
        # This will not find any active children:
        for child in active_children():
            if child.pid == res:
                print("Child found")
                child.join()
                child.close()
                print("Child closed")


    def worker(self):
        print("Worker called")
        return current_process().pid

# Required for Windows:
if __name__ == '__main__':
    ex = Example()
    ex.start()

打印:

Worker called
Child pid is 19936