为什么我不能加入由 ProcessPoolExecutor 或 Pool 创建的子进程?
Why can't I join a child process created by ProcessPoolExecutor or Pool?
我想从 multiprocessing.Pool 或 ProcessPoolExecutor 手动加入和关闭子进程。但是,每当我尝试加入由这些池中的任何一个创建的子进程时,代码就会无限期挂起。
from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED
from multiprocessing import current_process, active_children
class Example:
def start(self):
with ProcessPoolExecutor(max_workers=1) as executor:
futures = set()
futures.add(executor.submit(self.worker))
done, futures = wait(futures, return_when=FIRST_COMPLETED)
for task in done:
res = task.result()
print(f"Child pid is {res}")
for child in active_children():
if child.pid == res:
print("Child found")
child.join()
child.close()
print("Child closed")
def worker(self):
print("Worker called")
return current_process().pid
ex = Example()
ex.start()
from multiprocessing import current_process, active_children, Pool
class Example:
def start(self):
with Pool(processes=1) as pool:
task = pool.apply_async(self.worker)
res = task.get()
print(f"Child pid is {res}")
for child in active_children():
if child.pid == res:
print("Child found")
child.join()
child.close()
print("Child closed")
def worker(self):
print("Worker called")
return current_process().pid
ex = Example()
ex.start()
输出:
Worker called
Child pid is 284108
Child found
为什么?
多处理池由一个或多个进程组成,这些进程从输入队列中获取提交的“任务”并 运行s 任务直到完成,然后返回以从中获取下一个任务队列。这些过程 运行 直到您通过下面描述的方法之一隐式或显式终止整个池。但重要的一点是,当您向池提交任务时(例如使用 concurrent.futures.ProcessPoolExecutor.submit
或 multiprocessing.pool.Pool.apply_async
),您传递给这些调用的指定工作函数已在 运行 中执行进程,并且该进程在终止之前无法加入,并且除非您采取特定操作来终止它,否则不会终止。
但是没有理由在处理池中的单个进程上显式调用 join
并且它会导致您阻塞 因为这些进程直到concurrent.futures.ProcessPoolExecutor
池你要么先调用 shutdown(wait=True)
并且所有提交的任务都已完成,要么你调用 shutdown(wait=False)
或者如果你正在处理 multiprocess.pool.Pool
池,你首先调用 terminate
或序列 pool.close()
后跟 pool.join()
(加入 all 池进程,当所有提交的任务完成时将终止)。但那时不再有任何 运行ning 池进程。例如,如果我们调用 pool.terminate()
:
from multiprocessing import current_process, active_children, Pool
class Example:
def start(self):
pool = Pool(processes=1)
task = pool.apply_async(self.worker)
res = task.get()
pool.terminate() # Now there are no more running processes:
print(f"Child pid is {res}")
# This will not find any active children:
for child in active_children():
if child.pid == res:
print("Child found")
child.join()
child.close()
print("Child closed")
def worker(self):
print("Worker called")
return current_process().pid
# Required for Windows:
if __name__ == '__main__':
ex = Example()
ex.start()
打印:
Worker called
Child pid is 18076
或者如果我们等待所有任务完成并且池进程自行终止:
from multiprocessing import current_process, active_children, Pool
class Example:
def start(self):
pool = Pool(processes=1)
task = pool.apply_async(self.worker)
res = task.get()
# wait for all tasks to complete:
pool.close()
pool.join()
print(f"Child pid is {res}")
# This will not find any active children:
for child in active_children():
if child.pid == res:
print("Child found")
child.join()
child.close()
print("Child closed")
def worker(self):
print("Worker called")
return current_process().pid
# Required for Windows:
if __name__ == '__main__':
ex = Example()
ex.start()
打印:
Worker called
Child pid is 19936
我想从 multiprocessing.Pool 或 ProcessPoolExecutor 手动加入和关闭子进程。但是,每当我尝试加入由这些池中的任何一个创建的子进程时,代码就会无限期挂起。
from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED
from multiprocessing import current_process, active_children
class Example:
def start(self):
with ProcessPoolExecutor(max_workers=1) as executor:
futures = set()
futures.add(executor.submit(self.worker))
done, futures = wait(futures, return_when=FIRST_COMPLETED)
for task in done:
res = task.result()
print(f"Child pid is {res}")
for child in active_children():
if child.pid == res:
print("Child found")
child.join()
child.close()
print("Child closed")
def worker(self):
print("Worker called")
return current_process().pid
ex = Example()
ex.start()
from multiprocessing import current_process, active_children, Pool
class Example:
def start(self):
with Pool(processes=1) as pool:
task = pool.apply_async(self.worker)
res = task.get()
print(f"Child pid is {res}")
for child in active_children():
if child.pid == res:
print("Child found")
child.join()
child.close()
print("Child closed")
def worker(self):
print("Worker called")
return current_process().pid
ex = Example()
ex.start()
输出:
Worker called
Child pid is 284108
Child found
为什么?
多处理池由一个或多个进程组成,这些进程从输入队列中获取提交的“任务”并 运行s 任务直到完成,然后返回以从中获取下一个任务队列。这些过程 运行 直到您通过下面描述的方法之一隐式或显式终止整个池。但重要的一点是,当您向池提交任务时(例如使用 concurrent.futures.ProcessPoolExecutor.submit
或 multiprocessing.pool.Pool.apply_async
),您传递给这些调用的指定工作函数已在 运行 中执行进程,并且该进程在终止之前无法加入,并且除非您采取特定操作来终止它,否则不会终止。
但是没有理由在处理池中的单个进程上显式调用 join
并且它会导致您阻塞 因为这些进程直到concurrent.futures.ProcessPoolExecutor
池你要么先调用 shutdown(wait=True)
并且所有提交的任务都已完成,要么你调用 shutdown(wait=False)
或者如果你正在处理 multiprocess.pool.Pool
池,你首先调用 terminate
或序列 pool.close()
后跟 pool.join()
(加入 all 池进程,当所有提交的任务完成时将终止)。但那时不再有任何 运行ning 池进程。例如,如果我们调用 pool.terminate()
:
from multiprocessing import current_process, active_children, Pool
class Example:
def start(self):
pool = Pool(processes=1)
task = pool.apply_async(self.worker)
res = task.get()
pool.terminate() # Now there are no more running processes:
print(f"Child pid is {res}")
# This will not find any active children:
for child in active_children():
if child.pid == res:
print("Child found")
child.join()
child.close()
print("Child closed")
def worker(self):
print("Worker called")
return current_process().pid
# Required for Windows:
if __name__ == '__main__':
ex = Example()
ex.start()
打印:
Worker called
Child pid is 18076
或者如果我们等待所有任务完成并且池进程自行终止:
from multiprocessing import current_process, active_children, Pool
class Example:
def start(self):
pool = Pool(processes=1)
task = pool.apply_async(self.worker)
res = task.get()
# wait for all tasks to complete:
pool.close()
pool.join()
print(f"Child pid is {res}")
# This will not find any active children:
for child in active_children():
if child.pid == res:
print("Child found")
child.join()
child.close()
print("Child closed")
def worker(self):
print("Worker called")
return current_process().pid
# Required for Windows:
if __name__ == '__main__':
ex = Example()
ex.start()
打印:
Worker called
Child pid is 19936