如何从 class 启动和停止多个子进程?
How to start and stop multiple child processes from a class?
Python程序:
import multiprocessing
import time
class Application:
def __init__(self):
self._event = multiprocessing.Event()
self._processes = [
multiprocessing.Process(target=self._worker)
for _ in range(multiprocessing.cpu_count())]
def _worker(self):
while not self._event.is_set():
print(multiprocessing.current_process().name)
time.sleep(1)
def start(self):
for process in self._processes:
print('starting')
process.start()
def stop(self):
self._event.set()
for process in self._processes:
process.join()
if __name__ == '__main__':
application = Application()
application.start()
time.sleep(3)
application.stop()
其输出:
starting
starting
Traceback (most recent call last):
File "/Users/maggyero/Desktop/application.py", line 31, in <module>
application.start()
File "/Users/maggyero/Desktop/application.py", line 21, in start
process.start()
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
reduction.dump(process_obj, fp)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory
在函数 Application.__init__
中,每次调用 multiprocessing.Process(target=self._worker)
都会使用实例方法 self._worker
作为其 target
参数初始化一个 multiprocessing.Process
实例。 self._worker
绑定到具有实例属性 self._processes
.
的 self
在函数 Application.start
中,每次调用 process.start()
都会序列化 target
参数,因此 self._processes
。 self._processes
是 multiprocessing.Process
个实例的列表,最初尚未启动。第一次调用 process.start()
启动该列表中的第一个 multiprocessing.Process
实例没有问题,但第二次调用 process.start()
失败。
因此无法序列化已启动的 multiprocessing.Process
实例。如何解决这个问题?
问题的根源在于 multiprocessing.Process
实例的 start
方法将其 _popen
实例属性设置为 multiprocessing.popen_*.Popen
实例。该实例的初始化执行以下两个步骤(以及其他步骤):
对于 multiprocessing.popen_spawn_posix.Popen
实例、multiprocessing.popen_spawn_win32.Popen
实例或 multiprocessing.popen_forkserver.Popen
实例但不是 multiprocessing.popen_fork.Popen
实例(即对于启动方法'spawn'
或启动方法 'forkserver'
但不是启动方法 'fork'
),它 serialises multiprocessing.Process
实例用于将其写入管道的末尾父进程与子进程通信,以便子进程可以执行 multiprocessing.Process
实例的 run
方法。
它将 sets 其 finalizer
实例属性设置为 multiprocessing.util.Finalize
实例,该实例本身将其 _weakref
实例属性设置为 weakref.ref
用于在解释器出口处关闭父进程用于与子进程通信的管道末端的实例。换句话说,它使 multiprocessing.Process
实例持有弱引用。
因此,如果 multiprocessing.Process
实例持有对已启动 multiprocessing.Process
实例的引用,那么它持有弱引用(第 2 点),因此启动它将失败,因为它将序列化(第 1 点)弱引用和弱引用不可序列化:
import multiprocessing
if __name__ == '__main__':
multiprocessing.set_start_method('spawn') # or 'forkserver' but not 'fork'
process_a = multiprocessing.Process()
process_b = multiprocessing.Process()
process_b.foo = process_a
process_a.start() # creates process_a._popen.finalizer._weakref
process_b.start() # TypeError: cannot pickle 'weakref' object
显示序列化问题的最小 Python 程序:
import pickle
import weakref
pickle.dumps(weakref.ref(int)) # TypeError: cannot pickle 'weakref' object
避免序列化问题的两种解决方法:
- 要么将
target
参数设为 classmethod
,这样它就不会绑定到 self
(尤其是实例属性 self._processes
):
class Application:
def __init__(self):
self._event = multiprocessing.Event()
self._processes = [
multiprocessing.Process(target=self._worker, args=(self._event,))
for _ in range(multiprocessing.cpu_count())]
@classmethod
def _worker(self, event):
while not self._event.is_set():
print(multiprocessing.current_process().name)
time.sleep(1)
def start(self):
for process in self._processes:
print('starting')
process.start()
def stop(self):
self._event.set()
for process in self._processes:
process.join()
- 或者从
target
参数的序列化中特别排除实例属性 self._processes
__getstate__
:
class Application:
def __init__(self):
self._event = multiprocessing.Event()
self._processes = [
multiprocessing.Process(target=self._worker)
for _ in range(multiprocessing.cpu_count())]
def _worker(self):
while not self._event.is_set():
print(multiprocessing.current_process().name)
time.sleep(1)
def start(self):
for process in self._processes:
print('starting')
process.start()
def stop(self):
self._event.set()
for process in self._processes:
process.join()
def __getstate__(self):
state = {}
for key, value in vars(self).items():
if key != '_processes':
state[key] = value
return state
Python程序:
import multiprocessing
import time
class Application:
def __init__(self):
self._event = multiprocessing.Event()
self._processes = [
multiprocessing.Process(target=self._worker)
for _ in range(multiprocessing.cpu_count())]
def _worker(self):
while not self._event.is_set():
print(multiprocessing.current_process().name)
time.sleep(1)
def start(self):
for process in self._processes:
print('starting')
process.start()
def stop(self):
self._event.set()
for process in self._processes:
process.join()
if __name__ == '__main__':
application = Application()
application.start()
time.sleep(3)
application.stop()
其输出:
starting
starting
Traceback (most recent call last):
File "/Users/maggyero/Desktop/application.py", line 31, in <module>
application.start()
File "/Users/maggyero/Desktop/application.py", line 21, in start
process.start()
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
reduction.dump(process_obj, fp)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory
在函数 Application.__init__
中,每次调用 multiprocessing.Process(target=self._worker)
都会使用实例方法 self._worker
作为其 target
参数初始化一个 multiprocessing.Process
实例。 self._worker
绑定到具有实例属性 self._processes
.
self
在函数 Application.start
中,每次调用 process.start()
都会序列化 target
参数,因此 self._processes
。 self._processes
是 multiprocessing.Process
个实例的列表,最初尚未启动。第一次调用 process.start()
启动该列表中的第一个 multiprocessing.Process
实例没有问题,但第二次调用 process.start()
失败。
因此无法序列化已启动的 multiprocessing.Process
实例。如何解决这个问题?
问题的根源在于 multiprocessing.Process
实例的 start
方法将其 _popen
实例属性设置为 multiprocessing.popen_*.Popen
实例。该实例的初始化执行以下两个步骤(以及其他步骤):
对于
multiprocessing.popen_spawn_posix.Popen
实例、multiprocessing.popen_spawn_win32.Popen
实例或multiprocessing.popen_forkserver.Popen
实例但不是multiprocessing.popen_fork.Popen
实例(即对于启动方法'spawn'
或启动方法'forkserver'
但不是启动方法'fork'
),它 serialisesmultiprocessing.Process
实例用于将其写入管道的末尾父进程与子进程通信,以便子进程可以执行multiprocessing.Process
实例的run
方法。它将 sets 其
finalizer
实例属性设置为multiprocessing.util.Finalize
实例,该实例本身将其_weakref
实例属性设置为weakref.ref
用于在解释器出口处关闭父进程用于与子进程通信的管道末端的实例。换句话说,它使multiprocessing.Process
实例持有弱引用。
因此,如果 multiprocessing.Process
实例持有对已启动 multiprocessing.Process
实例的引用,那么它持有弱引用(第 2 点),因此启动它将失败,因为它将序列化(第 1 点)弱引用和弱引用不可序列化:
import multiprocessing
if __name__ == '__main__':
multiprocessing.set_start_method('spawn') # or 'forkserver' but not 'fork'
process_a = multiprocessing.Process()
process_b = multiprocessing.Process()
process_b.foo = process_a
process_a.start() # creates process_a._popen.finalizer._weakref
process_b.start() # TypeError: cannot pickle 'weakref' object
显示序列化问题的最小 Python 程序:
import pickle
import weakref
pickle.dumps(weakref.ref(int)) # TypeError: cannot pickle 'weakref' object
避免序列化问题的两种解决方法:
- 要么将
target
参数设为classmethod
,这样它就不会绑定到self
(尤其是实例属性self._processes
):
class Application:
def __init__(self):
self._event = multiprocessing.Event()
self._processes = [
multiprocessing.Process(target=self._worker, args=(self._event,))
for _ in range(multiprocessing.cpu_count())]
@classmethod
def _worker(self, event):
while not self._event.is_set():
print(multiprocessing.current_process().name)
time.sleep(1)
def start(self):
for process in self._processes:
print('starting')
process.start()
def stop(self):
self._event.set()
for process in self._processes:
process.join()
- 或者从
target
参数的序列化中特别排除实例属性self._processes
__getstate__
:
class Application:
def __init__(self):
self._event = multiprocessing.Event()
self._processes = [
multiprocessing.Process(target=self._worker)
for _ in range(multiprocessing.cpu_count())]
def _worker(self):
while not self._event.is_set():
print(multiprocessing.current_process().name)
time.sleep(1)
def start(self):
for process in self._processes:
print('starting')
process.start()
def stop(self):
self._event.set()
for process in self._processes:
process.join()
def __getstate__(self):
state = {}
for key, value in vars(self).items():
if key != '_processes':
state[key] = value
return state