在 Python 中使用进程共享对象时如何在收到 SIGINT 后防止 BrokenPipeErrors?
How to prevent BrokenPipeErrors after receiving a SIGINT while using process shared objects in Python?
这个Python程序:
import concurrent.futures
import multiprocessing
import time
class A:
def __init__(self):
self.event = multiprocessing.Manager().Event()
def start(self):
try:
while True:
if self.event.is_set():
break
print("processing")
time.sleep(1)
except BaseException as e:
print(type(e).__name__ + " (from pool thread):", e)
def shutdown(self):
self.event.set()
if __name__ == "__main__":
try:
a = A()
pool = concurrent.futures.ThreadPoolExecutor(1)
future = pool.submit(a.start)
while not future.done():
concurrent.futures.wait([future], timeout=0.1)
except BaseException as e:
print(type(e).__name__ + " (from main thread):", e)
finally:
a.shutdown()
pool.shutdown()
输出:
processing
processing
processing
KeyboardInterrupt (from main thread):
BrokenPipeError (from pool thread): [WinError 232] The pipe is being closed
Traceback (most recent call last):
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 788, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File ".\foo.py", line 34, in <module>
a.shutdown()
File ".\foo.py", line 21, in shutdown
self.event.set()
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 1067, in set
return self._callmethod('set')
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 792, in _callmethod
self._connect()
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 779, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "C:\Program Files\Python37\lib\multiprocessing\connection.py", line 490, in Client
c = PipeClient(address)
File "C:\Program Files\Python37\lib\multiprocessing\connection.py", line 691, in PipeClient
_winapi.WaitNamedPipe(address, 1000)
FileNotFoundError: [WinError 2] The system cannot find the file specified
当它是运行并且三秒后发送SIGINT
信号(通过按Ctrl+C).
Analysis. — SIGINT
信号发送到每个进程的主线程。本例中有两个进程:主进程和管理器的子进程。
- 在主进程的主线程中:接收到
SIGINT
信号后,默认的SIGINT
信号处理程序引发KeyboardInterrupt
异常,该异常被捕获并打印。
- 在管理器子进程的主线程中:同时,在收到
SIGINT
信号后,默认的SIGINT
信号处理程序引发了一个KeyboardInterrupt
异常,该异常终止子进程。因此,其他进程对管理器共享对象的所有后续使用都会引发 BrokenPipeError
异常。
- 在主进程的池子线程中:在这种情况下,在行
if self.event.is_set():
. 处引发了 BrokenPipeError
异常
- 在主进程的主线程中:最后,控制流到达行
a.shutdown()
,这引发了 AttributeError
和 FileNotFoundError
异常。
如何防止此 BrokenPipeError
异常?
此问题的解决方案是使用忽略信号的处理程序覆盖默认的 SIGINT
信号处理程序,例如在管理器的子进程开始时使用 signal.SIG_IGN
standard signal handler. It is possible by calling the signal.signal
函数:
import concurrent.futures
import multiprocessing.managers
import signal
import time
def init():
signal.signal(signal.SIGINT, signal.SIG_IGN)
class A:
def __init__(self):
manager = multiprocessing.managers.SyncManager()
manager.start(init)
self.event = manager.Event()
def start(self):
try:
while True:
if self.event.is_set():
break
print("processing")
time.sleep(1)
except BaseException as e:
print(type(e).__name__ + " (from pool thread):", e)
def shutdown(self):
self.event.set()
if __name__ == "__main__":
try:
a = A()
pool = concurrent.futures.ThreadPoolExecutor(1)
future = pool.submit(a.start)
while not future.done():
concurrent.futures.wait([future], timeout=0.1)
except BaseException as e:
print(type(e).__name__ + " (from main thread):", e)
finally:
a.shutdown()
pool.shutdown()
注意。 — 该程序也适用于 concurrent.futures.ProcessPoolExecutor
。
这个Python程序:
import concurrent.futures
import multiprocessing
import time
class A:
def __init__(self):
self.event = multiprocessing.Manager().Event()
def start(self):
try:
while True:
if self.event.is_set():
break
print("processing")
time.sleep(1)
except BaseException as e:
print(type(e).__name__ + " (from pool thread):", e)
def shutdown(self):
self.event.set()
if __name__ == "__main__":
try:
a = A()
pool = concurrent.futures.ThreadPoolExecutor(1)
future = pool.submit(a.start)
while not future.done():
concurrent.futures.wait([future], timeout=0.1)
except BaseException as e:
print(type(e).__name__ + " (from main thread):", e)
finally:
a.shutdown()
pool.shutdown()
输出:
processing
processing
processing
KeyboardInterrupt (from main thread):
BrokenPipeError (from pool thread): [WinError 232] The pipe is being closed
Traceback (most recent call last):
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 788, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File ".\foo.py", line 34, in <module>
a.shutdown()
File ".\foo.py", line 21, in shutdown
self.event.set()
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 1067, in set
return self._callmethod('set')
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 792, in _callmethod
self._connect()
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 779, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "C:\Program Files\Python37\lib\multiprocessing\connection.py", line 490, in Client
c = PipeClient(address)
File "C:\Program Files\Python37\lib\multiprocessing\connection.py", line 691, in PipeClient
_winapi.WaitNamedPipe(address, 1000)
FileNotFoundError: [WinError 2] The system cannot find the file specified
当它是运行并且三秒后发送SIGINT
信号(通过按Ctrl+C).
Analysis. — SIGINT
信号发送到每个进程的主线程。本例中有两个进程:主进程和管理器的子进程。
- 在主进程的主线程中:接收到
SIGINT
信号后,默认的SIGINT
信号处理程序引发KeyboardInterrupt
异常,该异常被捕获并打印。 - 在管理器子进程的主线程中:同时,在收到
SIGINT
信号后,默认的SIGINT
信号处理程序引发了一个KeyboardInterrupt
异常,该异常终止子进程。因此,其他进程对管理器共享对象的所有后续使用都会引发BrokenPipeError
异常。 - 在主进程的池子线程中:在这种情况下,在行
if self.event.is_set():
. 处引发了 - 在主进程的主线程中:最后,控制流到达行
a.shutdown()
,这引发了AttributeError
和FileNotFoundError
异常。
BrokenPipeError
异常
如何防止此 BrokenPipeError
异常?
此问题的解决方案是使用忽略信号的处理程序覆盖默认的 SIGINT
信号处理程序,例如在管理器的子进程开始时使用 signal.SIG_IGN
standard signal handler. It is possible by calling the signal.signal
函数:
import concurrent.futures
import multiprocessing.managers
import signal
import time
def init():
signal.signal(signal.SIGINT, signal.SIG_IGN)
class A:
def __init__(self):
manager = multiprocessing.managers.SyncManager()
manager.start(init)
self.event = manager.Event()
def start(self):
try:
while True:
if self.event.is_set():
break
print("processing")
time.sleep(1)
except BaseException as e:
print(type(e).__name__ + " (from pool thread):", e)
def shutdown(self):
self.event.set()
if __name__ == "__main__":
try:
a = A()
pool = concurrent.futures.ThreadPoolExecutor(1)
future = pool.submit(a.start)
while not future.done():
concurrent.futures.wait([future], timeout=0.1)
except BaseException as e:
print(type(e).__name__ + " (from main thread):", e)
finally:
a.shutdown()
pool.shutdown()
注意。 — 该程序也适用于 concurrent.futures.ProcessPoolExecutor
。