处理多处理池中的工人死亡
Handling worker death in multiprocessing Pool
我有一个简单的服务器:
from multiprocessing import Pool, TimeoutError
import time
import os
if __name__ == '__main__':
# start worker processes
pool = Pool(processes=1)
while True:
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
try:
print(res.get(timeout=1)) # prints the PID of that process
except TimeoutError:
print('worker timed out')
time.sleep(5)
pool.close()
print("Now the pool is closed and no longer available")
pool.join()
print("Done")
如果我 运行 这样我会得到类似的东西:
47292
47292
然后我 kill 47292
当服务器正在 运行ning 时。启动了一个新的工作进程,但服务器的输出是:
47292
47292
worker timed out
worker timed out
worker timed out
池仍在尝试向旧工作进程发送请求。
我已经完成了一些在服务器和 worker 中捕获信号的工作,我的行为稍微好一些,但服务器似乎仍在等待死机 children 关闭(即 pool.join() 永远不会结束) 一个工人被杀后。
处理工人死亡的正确方法是什么?
只有 none 的工作人员死亡时,从服务器进程正常关闭工作人员似乎才有效。
(在 Python 3.4.4 上,但如果有帮助,我很乐意升级。)
更新:
有趣的是,如果使用 processes=2 创建池并且您杀死一个工作进程,等待几秒钟并杀死另一个,则不会发生此工作超时问题。但是,如果您快速连续地终止两个工作进程,那么 "worker timed out" 问题会再次出现。
可能相关的是,当问题发生时,杀死服务器进程将离开工作进程运行ning。
此行为来自 multiprocessing.Pool
的设计。当你杀死一名工人时,你可能会杀死持有 call_queue.rlock
的人。当这个进程在持有锁的同时被杀死时,没有其他进程能够再读取 call_queue
,破坏 Pool
因为它不能再与它的工人通信。
所以实际上没有办法杀死一个工人并确保你的 Pool
之后仍然可以,因为你可能会陷入僵局。
multiprocessing.Pool
不处理工人死亡。您可以尝试使用 concurrent.futures.ProcessPoolExecutor
代替(API 略有不同),它默认处理进程失败。当一个进程在 ProcessPoolExecutor
中结束时,整个执行程序将关闭,您会返回一个 BrokenProcessPool
错误。
请注意,此实现中还有其他死锁,应在 loky
中修复。 (免责声明: 我是这个库的维护者)。此外,loky
允许您使用 ReusablePoolExecutor
和方法 _resize
调整现有 executor
的大小。如果您有兴趣,请告诉我,我可以从这个包开始为您提供一些帮助。 (我意识到我们仍然需要在文档方面做一些工作...0_0)
我有一个简单的服务器:
from multiprocessing import Pool, TimeoutError
import time
import os
if __name__ == '__main__':
# start worker processes
pool = Pool(processes=1)
while True:
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
try:
print(res.get(timeout=1)) # prints the PID of that process
except TimeoutError:
print('worker timed out')
time.sleep(5)
pool.close()
print("Now the pool is closed and no longer available")
pool.join()
print("Done")
如果我 运行 这样我会得到类似的东西:
47292
47292
然后我 kill 47292
当服务器正在 运行ning 时。启动了一个新的工作进程,但服务器的输出是:
47292
47292
worker timed out
worker timed out
worker timed out
池仍在尝试向旧工作进程发送请求。
我已经完成了一些在服务器和 worker 中捕获信号的工作,我的行为稍微好一些,但服务器似乎仍在等待死机 children 关闭(即 pool.join() 永远不会结束) 一个工人被杀后。
处理工人死亡的正确方法是什么?
只有 none 的工作人员死亡时,从服务器进程正常关闭工作人员似乎才有效。
(在 Python 3.4.4 上,但如果有帮助,我很乐意升级。)
更新: 有趣的是,如果使用 processes=2 创建池并且您杀死一个工作进程,等待几秒钟并杀死另一个,则不会发生此工作超时问题。但是,如果您快速连续地终止两个工作进程,那么 "worker timed out" 问题会再次出现。
可能相关的是,当问题发生时,杀死服务器进程将离开工作进程运行ning。
此行为来自 multiprocessing.Pool
的设计。当你杀死一名工人时,你可能会杀死持有 call_queue.rlock
的人。当这个进程在持有锁的同时被杀死时,没有其他进程能够再读取 call_queue
,破坏 Pool
因为它不能再与它的工人通信。
所以实际上没有办法杀死一个工人并确保你的 Pool
之后仍然可以,因为你可能会陷入僵局。
multiprocessing.Pool
不处理工人死亡。您可以尝试使用 concurrent.futures.ProcessPoolExecutor
代替(API 略有不同),它默认处理进程失败。当一个进程在 ProcessPoolExecutor
中结束时,整个执行程序将关闭,您会返回一个 BrokenProcessPool
错误。
请注意,此实现中还有其他死锁,应在 loky
中修复。 (免责声明: 我是这个库的维护者)。此外,loky
允许您使用 ReusablePoolExecutor
和方法 _resize
调整现有 executor
的大小。如果您有兴趣,请告诉我,我可以从这个包开始为您提供一些帮助。 (我意识到我们仍然需要在文档方面做一些工作...0_0)