如何使用 ProcessPoolExecutor 优雅地退出程序?
How to gracefully exit program using ProcessPoolExecutor?
以下面的程序为例:
import asyncio
from concurrent.futures import ProcessPoolExecutor
def process():
print('processed')
async def main(loop, executor):
await loop.run_in_executor(executor, process)
await asyncio.sleep(60.0)
executor = ProcessPoolExecutor()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop, executor))
except KeyboardInterrupt:
pass
finally:
executor.shutdown()
如果我在程序 运行 时按 Ctrl + C,我会在它退出时收到一条真正的消息回溯:
processed
^CProcess Process-3:
Process Process-4:
Process Process-2:
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/concurrent/futures/process.py", line 169, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/queues.py", line 93, in get
with self._rlock:
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/synchronize.py", line 96, in __enter__
return self._semlock.__enter__()
Traceback (most recentTraceback (most recent call last):
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/concurrent/futures/process.py", line 169, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/queues.py", line 94, in get
res = self._recv_bytes()
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versi call last):
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/concurrent/futures/process.py", line 169, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/queues.py", line 93, in get
with self._rlock:
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/synchronize.py", line 96, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
..... (It goes on for a while longer)
在使用多处理池的程序中,是否有更优雅的方式来处理 KeyboardInterrupt
?
不确定这是否是正确的(或唯一的)解决方案,但我通常会添加一个显式的 SIGINT
信号处理程序,而不是依赖解释器引发的 KeyboardInterrupt
的默认行为在 SIGINT
上。这给了你更多的控制权,并希望避免意外的影响。
根据@germn 的建议更新:
import asyncio
import signal
from concurrent.futures import ProcessPoolExecutor
def shutdown(loop, executor):
executor.shutdown()
for task in asyncio.Task.all_tasks():
task.cancel()
loop.stop()
def process():
print('processed')
async def main(loop, executor):
await loop.run_in_executor(executor, process)
loop.create_task(asyncio.sleep(120))
loop.create_task(asyncio.sleep(12))
loop.create_task(asyncio.sleep(130))
await asyncio.sleep(60.0)
executor = ProcessPoolExecutor()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, shutdown, loop, executor)
loop.run_until_complete(main(loop, executor))
loop.close()
以下面的程序为例:
import asyncio
from concurrent.futures import ProcessPoolExecutor
def process():
print('processed')
async def main(loop, executor):
await loop.run_in_executor(executor, process)
await asyncio.sleep(60.0)
executor = ProcessPoolExecutor()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop, executor))
except KeyboardInterrupt:
pass
finally:
executor.shutdown()
如果我在程序 运行 时按 Ctrl + C,我会在它退出时收到一条真正的消息回溯:
processed
^CProcess Process-3:
Process Process-4:
Process Process-2:
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/concurrent/futures/process.py", line 169, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/queues.py", line 93, in get
with self._rlock:
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/synchronize.py", line 96, in __enter__
return self._semlock.__enter__()
Traceback (most recentTraceback (most recent call last):
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/concurrent/futures/process.py", line 169, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/queues.py", line 94, in get
res = self._recv_bytes()
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versi call last):
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/concurrent/futures/process.py", line 169, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/queues.py", line 93, in get
with self._rlock:
File "/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/pytho
n3.5/multiprocessing/synchronize.py", line 96, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
..... (It goes on for a while longer)
在使用多处理池的程序中,是否有更优雅的方式来处理 KeyboardInterrupt
?
不确定这是否是正确的(或唯一的)解决方案,但我通常会添加一个显式的 SIGINT
信号处理程序,而不是依赖解释器引发的 KeyboardInterrupt
的默认行为在 SIGINT
上。这给了你更多的控制权,并希望避免意外的影响。
根据@germn 的建议更新:
import asyncio
import signal
from concurrent.futures import ProcessPoolExecutor
def shutdown(loop, executor):
executor.shutdown()
for task in asyncio.Task.all_tasks():
task.cancel()
loop.stop()
def process():
print('processed')
async def main(loop, executor):
await loop.run_in_executor(executor, process)
loop.create_task(asyncio.sleep(120))
loop.create_task(asyncio.sleep(12))
loop.create_task(asyncio.sleep(130))
await asyncio.sleep(60.0)
executor = ProcessPoolExecutor()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, shutdown, loop, executor)
loop.run_until_complete(main(loop, executor))
loop.close()