打印功能使多处理程序失败
A print function makes a multiprocessing program fail
在下面的代码中,我试图创建一个沙盒主从系统,其中对一个工作人员的全局变量的更改不会反映到其他工作人员。
为此,每次创建任务时都会创建一个新进程,并且为了使执行并行,进程本身的创建由 ThreadPoolExecutor
管理。
import time
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Pipe, Process
def task(conn, arg):
conn.send(arg * 2)
def isolate_fn(fn, arg):
def wrapped():
parent_conn, child_conn = Pipe()
p = Process(target=fn, args=(child_conn, arg), daemon=True)
try:
p.start()
r = parent_conn.recv()
finally:
p.join()
return r
return wrapped
def main():
with ThreadPoolExecutor(max_workers=4) as executor:
pair = []
for i in range(0, 10):
pair.append((i, executor.submit(isolate_fn(task, i))))
# This function makes the program broken.
#
print('foo')
time.sleep(2)
for arg, future in pair:
if future.done():
print('arg: {}, res: {}'.format(arg, future.result()))
else:
print('not finished: {}'.format(arg))
print('finished')
main()
这个程序运行良好,直到我将 print('foo')
函数放入循环中。如果该函数存在,一些任务仍未完成,更糟糕的是,该程序本身并没有完成。
结果并不总是相同,但以下是典型的输出:
foo
foo
foo
foo
foo
foo
foo
foo
foo
foo
arg: 0, res: 0
arg: 1, res: 2
arg: 2, res: 4
not finished: 3
not finished: 4
not finished: 5
not finished: 6
not finished: 7
not finished: 8
not finished: 9
为什么这个程序如此脆弱?
我用的是Python3.4.5.
您不是每次都创建 ThreadPoolExecutor ,而是为每次迭代使用预初始化的池。我真的无法跟踪哪个打印语句阻碍了您?
尝试使用
from multiprocessing import set_start_method
... rest of your code here ....
if __name__ == '__main__':
set_start_method('spawn')
main()
如果您在 Whosebug 上搜索 python 多处理和多线程,您会发现很多问题都提到了类似的悬而未决的问题。 (特别是 python 版本 2.7 和 3.2)
混合多线程和多处理仍然是一个问题,甚至 multiprocessing.set_start_method 的 python 文档也提到了这一点。在你的情况下 'spawn' 和 'forkserver' 应该没有任何问题。
另一种选择可能是直接使用 MultiProcessingPool,但在更复杂的用例中这对您来说可能是不可能的。
顺便说一句。 'Not Finished' 可能仍会出现在您的输出中,因为您没有等待子进程完成,但整个代码不应再挂起并始终干净地完成。
在下面的代码中,我试图创建一个沙盒主从系统,其中对一个工作人员的全局变量的更改不会反映到其他工作人员。
为此,每次创建任务时都会创建一个新进程,并且为了使执行并行,进程本身的创建由 ThreadPoolExecutor
管理。
import time
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Pipe, Process
def task(conn, arg):
conn.send(arg * 2)
def isolate_fn(fn, arg):
def wrapped():
parent_conn, child_conn = Pipe()
p = Process(target=fn, args=(child_conn, arg), daemon=True)
try:
p.start()
r = parent_conn.recv()
finally:
p.join()
return r
return wrapped
def main():
with ThreadPoolExecutor(max_workers=4) as executor:
pair = []
for i in range(0, 10):
pair.append((i, executor.submit(isolate_fn(task, i))))
# This function makes the program broken.
#
print('foo')
time.sleep(2)
for arg, future in pair:
if future.done():
print('arg: {}, res: {}'.format(arg, future.result()))
else:
print('not finished: {}'.format(arg))
print('finished')
main()
这个程序运行良好,直到我将 print('foo')
函数放入循环中。如果该函数存在,一些任务仍未完成,更糟糕的是,该程序本身并没有完成。
结果并不总是相同,但以下是典型的输出:
foo
foo
foo
foo
foo
foo
foo
foo
foo
foo
arg: 0, res: 0
arg: 1, res: 2
arg: 2, res: 4
not finished: 3
not finished: 4
not finished: 5
not finished: 6
not finished: 7
not finished: 8
not finished: 9
为什么这个程序如此脆弱?
我用的是Python3.4.5.
您不是每次都创建 ThreadPoolExecutor ,而是为每次迭代使用预初始化的池。我真的无法跟踪哪个打印语句阻碍了您?
尝试使用
from multiprocessing import set_start_method
... rest of your code here ....
if __name__ == '__main__':
set_start_method('spawn')
main()
如果您在 Whosebug 上搜索 python 多处理和多线程,您会发现很多问题都提到了类似的悬而未决的问题。 (特别是 python 版本 2.7 和 3.2)
混合多线程和多处理仍然是一个问题,甚至 multiprocessing.set_start_method 的 python 文档也提到了这一点。在你的情况下 'spawn' 和 'forkserver' 应该没有任何问题。
另一种选择可能是直接使用 MultiProcessingPool,但在更复杂的用例中这对您来说可能是不可能的。
顺便说一句。 'Not Finished' 可能仍会出现在您的输出中,因为您没有等待子进程完成,但整个代码不应再挂起并始终干净地完成。