事件循环中断后清理的正确方法是什么?
What's the correct way to clean up after an interrupted event loop?
我有一个事件循环,它作为命令行工具的一部分运行一些协同例程。用户可能会使用通常的 Ctrl + C 中断该工具,此时我想在中断的事件循环后正确清理。
这是我试过的。
import asyncio
@asyncio.coroutine
def shleepy_time(seconds):
print("Shleeping for {s} seconds...".format(s=seconds))
yield from asyncio.sleep(seconds)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Side note: Apparently, async() will be deprecated in 3.4.4.
# See: https://docs.python.org/3.4/library/asyncio-task.html#asyncio.async
tasks = [
asyncio.async(shleepy_time(seconds=5)),
asyncio.async(shleepy_time(seconds=10))
]
try:
loop.run_until_complete(asyncio.gather(*tasks))
except KeyboardInterrupt as e:
print("Caught keyboard interrupt. Canceling tasks...")
# This doesn't seem to be the correct solution.
for t in tasks:
t.cancel()
finally:
loop.close()
运行 并点击 Ctrl + C 得到:
$ python3 asyncio-keyboardinterrupt-example.py
Shleeping for 5 seconds...
Shleeping for 10 seconds...
^CCaught keyboard interrupt. Canceling tasks...
Task was destroyed but it is pending!
task: <Task pending coro=<shleepy_time() running at asyncio-keyboardinterrupt-example.py:7> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(1)() at /usr/local/Cellar/python3/3.4.3/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py:587]>
Task was destroyed but it is pending!
task: <Task pending coro=<shleepy_time() running at asyncio-keyboardinterrupt-example.py:7> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(0)() at /usr/local/Cellar/python3/3.4.3/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py:587]>
很明显,我没有正确清理。我想也许在任务上调用 cancel()
是完成任务的方法。
事件循环中断后清理的正确方法是什么?
除非你在 Windows,为 SIGINT 设置基于事件循环的信号处理程序(以及 SIGTERM,这样你就可以 运行 它作为一项服务)。在这些处理程序中,您可以立即退出事件循环,也可以启动某种清理序列并稍后退出。
官方 Python 文档中的示例:https://docs.python.org/3.4/library/asyncio-eventloop.html#set-signal-handlers-for-sigint-and-sigterm
当您按 CTRL+C 时,事件循环会停止,因此您对 t.cancel()
的调用实际上并没有生效。对于要取消的任务,您需要重新开始循环。
以下是处理方法:
import asyncio
@asyncio.coroutine
def shleepy_time(seconds):
print("Shleeping for {s} seconds...".format(s=seconds))
yield from asyncio.sleep(seconds)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Side note: Apparently, async() will be deprecated in 3.4.4.
# See: https://docs.python.org/3.4/library/asyncio-task.html#asyncio.async
tasks = asyncio.gather(
asyncio.async(shleepy_time(seconds=5)),
asyncio.async(shleepy_time(seconds=10))
)
try:
loop.run_until_complete(tasks)
except KeyboardInterrupt as e:
print("Caught keyboard interrupt. Canceling tasks...")
tasks.cancel()
loop.run_forever()
tasks.exception()
finally:
loop.close()
一旦捕获到 KeyboardInterrupt
,我们就会调用 tasks.cancel()
,然后再次启动 loop
。 run_forever
实际上会在 tasks
被取消后立即退出(请注意,取消 asyncio.gather
返回的 Future
也会取消其中的所有 Futures
),因为中断的 loop.run_until_complete
调用添加了一个 done_callback
到 tasks
来停止循环。因此,当我们取消 tasks
时,该回调将触发,循环停止。那时我们调用 tasks.exception
,只是为了避免收到关于未从 _GatheringFuture
.
获取异常的警告
Python 3.7+注意事项:下面是标准库 asyncio.run
函数的一部分 – 将下面替换为 sys.exit(loop.run(amain(loop)))
一旦你准备好升级! (如果您想打印消息,只需将 try…except
子句移至 amain
。)
更新 Python 3.6+:添加对 loop.shutdown_asyncgens
的调用以避免未完全使用的异步生成器造成内存泄漏。
受其他一些答案的启发,以下解决方案应该适用于几乎所有情况,并且不依赖于您手动跟踪需要在 Ctrl[=41 上清理的任务=]+C:
loop = asyncio.get_event_loop()
try:
# Here `amain(loop)` is the core coroutine that may spawn any
# number of tasks
sys.exit(loop.run_until_complete(amain(loop)))
except KeyboardInterrupt:
# Optionally show a message if the shutdown may take a while
print("Attempting graceful shutdown, press Ctrl+C again to exit…", flush=True)
# Do not show `asyncio.CancelledError` exceptions during shutdown
# (a lot of these may be generated, skip this if you prefer to see them)
def shutdown_exception_handler(loop, context):
if "exception" not in context \
or not isinstance(context["exception"], asyncio.CancelledError):
loop.default_exception_handler(context)
loop.set_exception_handler(shutdown_exception_handler)
# Handle shutdown gracefully by waiting for all tasks to be cancelled
tasks = asyncio.gather(*asyncio.Task.all_tasks(loop=loop), loop=loop, return_exceptions=True)
tasks.add_done_callback(lambda t: loop.stop())
tasks.cancel()
# Keep the event loop running until it is either destroyed or all
# tasks have really terminated
while not tasks.done() and not loop.is_closed():
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
以上代码将使用 asyncio.Task.all_tasks
从事件循环中获取所有当前任务,并使用 asyncio.gather
将它们放在一个组合的未来中。然后使用未来的 .cancel()
方法取消该未来的所有任务(当前都是 运行 任务)。 return_exceptions=True
然后确保所有收到的 asyncio.CancelledError
异常都被存储而不是导致未来变得错误。
以上代码还将覆盖默认的异常处理程序,以防止生成的 asyncio.CancelledError
异常被记录。
2020-12-17 更新:删除了 Python 3.5.
的兼容性代码
在Python 3.7+中建议您使用asyncio.run
启动异步主函数。
asyncio.run
将负责为您的程序创建事件循环,并确保在主函数退出时关闭事件循环并清除所有任务(包括由于 KeyboardInterrupt
异常).
大致类比如下(见asyncio/runners.py
):
def run(coro, *, debug=False):
"""`asyncio.run` is new in Python 3.7"""
loop = asyncio.get_event_loop()
try:
loop.set_debug(debug)
return loop.run_until_complete(coro)
finally:
try:
all_tasks = asyncio.gather(*asyncio.all_tasks(loop), return_exceptions=True)
all_tasks.cancel()
with contextlib.suppress(asyncio.CancelledError):
loop.run_until_complete(all_tasks)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
使用 signal
模块在 signal.SIGINT
信号 (Ctrl + C) 上设置 asyncio.Event
可以是告诉所有异步代码自然停止的简洁方法。这一点尤其重要,因为一些库如 aiohttp
need a chance to be run cleanup tasks before the event loop closes.
这是一个使用 aiohttp
库的示例。有一个asyncio.sleep(5)
防止连接返回池,让用户有机会ctrl+c模拟一个KeyboardInterrupt
异常
示例代码:
import logging
import asyncio
import signal
import random
import aiohttp
logging.basicConfig(level="INFO", format="%(asctime)s %(threadName)-10s %(name)-10s %(levelname)-8s: %(message)s")
logger = logging.getLogger("root")
stop_event = asyncio.Event()
async def get_json(aiohttp_session):
logger.info("making http request")
params = {"value": random.randint(0,1000) }
async with aiohttp_session.get(f'https://httpbin.org/get', params=params) as response:
# async with response:
j = await response.json()
logger.info("get data: `%s`", j["args"])
await asyncio.sleep(5)
async def run():
while not stop_event.is_set():
async with aiohttp.ClientSession() as aiohttp_session:
await get_json(aiohttp_session)
logger.info("stop event was set, sleeping to let aiohttp close it's connections")
await asyncio.sleep(0.1)
logger.info("sleep finished, returning")
def inner_ctrl_c_signal_handler(sig, frame):
'''
function that gets called when the user issues a
keyboard interrupt (ctrl+c)
'''
logger.info("SIGINT caught!")
stop_event.set()
# experiment with commenting out this line and ctrl+c-ing the script
# to see how you get an "event loop is closed" error
signal.signal(signal.SIGINT, inner_ctrl_c_signal_handler)
asyncio.run(run())
没有 signal.signal
调用:
> python C:\Users\mark\Temp\test_aiohttp.py
2021-03-06 22:21:08,684 MainThread root INFO : making http request
2021-03-06 22:21:09,132 MainThread root INFO : get data: `{'value': '500'}`
Traceback (most recent call last):
File "C:\Users\auror\Temp\test_aiohttp.py", line 52, in <module>
asyncio.run(run())
File "c:\python39\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "c:\python39\lib\asyncio\base_events.py", line 629, in run_until_complete
self.run_forever()
File "c:\python39\lib\asyncio\windows_events.py", line 316, in run_forever
super().run_forever()
File "c:\python39\lib\asyncio\base_events.py", line 596, in run_forever
self._run_once()
File "c:\python39\lib\asyncio\base_events.py", line 1854, in _run_once
event_list = self._selector.select(timeout)
File "c:\python39\lib\asyncio\windows_events.py", line 434, in select
self._poll(timeout)
File "c:\python39\lib\asyncio\windows_events.py", line 783, in _poll
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
KeyboardInterrupt
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x000001CFFD75BB80>
Traceback (most recent call last):
File "c:\python39\lib\asyncio\proactor_events.py", line 116, in __del__
self.close()
File "c:\python39\lib\asyncio\proactor_events.py", line 108, in close
self._loop.call_soon(self._call_connection_lost, None)
File "c:\python39\lib\asyncio\base_events.py", line 746, in call_soon
self._check_closed()
File "c:\python39\lib\asyncio\base_events.py", line 510, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
有了它:
> python C:\Users\mark\Temp\test_aiohttp.py
2021-03-06 22:20:29,656 MainThread root INFO : making http request
2021-03-06 22:20:30,106 MainThread root INFO : get data: `{'value': '367'}`
2021-03-06 22:20:35,122 MainThread root INFO : making http request
2021-03-06 22:20:35,863 MainThread root INFO : get data: `{'value': '489'}`
2021-03-06 22:20:38,695 MainThread root INFO : SIGINT caught!
2021-03-06 22:20:40,867 MainThread root INFO : stop event was set, sleeping to let aiohttp close it's connections
2021-03-06 22:20:40,962 MainThread root INFO : sleep finished, returning
我有一个事件循环,它作为命令行工具的一部分运行一些协同例程。用户可能会使用通常的 Ctrl + C 中断该工具,此时我想在中断的事件循环后正确清理。
这是我试过的。
import asyncio
@asyncio.coroutine
def shleepy_time(seconds):
print("Shleeping for {s} seconds...".format(s=seconds))
yield from asyncio.sleep(seconds)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Side note: Apparently, async() will be deprecated in 3.4.4.
# See: https://docs.python.org/3.4/library/asyncio-task.html#asyncio.async
tasks = [
asyncio.async(shleepy_time(seconds=5)),
asyncio.async(shleepy_time(seconds=10))
]
try:
loop.run_until_complete(asyncio.gather(*tasks))
except KeyboardInterrupt as e:
print("Caught keyboard interrupt. Canceling tasks...")
# This doesn't seem to be the correct solution.
for t in tasks:
t.cancel()
finally:
loop.close()
运行 并点击 Ctrl + C 得到:
$ python3 asyncio-keyboardinterrupt-example.py
Shleeping for 5 seconds...
Shleeping for 10 seconds...
^CCaught keyboard interrupt. Canceling tasks...
Task was destroyed but it is pending!
task: <Task pending coro=<shleepy_time() running at asyncio-keyboardinterrupt-example.py:7> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(1)() at /usr/local/Cellar/python3/3.4.3/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py:587]>
Task was destroyed but it is pending!
task: <Task pending coro=<shleepy_time() running at asyncio-keyboardinterrupt-example.py:7> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(0)() at /usr/local/Cellar/python3/3.4.3/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py:587]>
很明显,我没有正确清理。我想也许在任务上调用 cancel()
是完成任务的方法。
事件循环中断后清理的正确方法是什么?
除非你在 Windows,为 SIGINT 设置基于事件循环的信号处理程序(以及 SIGTERM,这样你就可以 运行 它作为一项服务)。在这些处理程序中,您可以立即退出事件循环,也可以启动某种清理序列并稍后退出。
官方 Python 文档中的示例:https://docs.python.org/3.4/library/asyncio-eventloop.html#set-signal-handlers-for-sigint-and-sigterm
当您按 CTRL+C 时,事件循环会停止,因此您对 t.cancel()
的调用实际上并没有生效。对于要取消的任务,您需要重新开始循环。
以下是处理方法:
import asyncio
@asyncio.coroutine
def shleepy_time(seconds):
print("Shleeping for {s} seconds...".format(s=seconds))
yield from asyncio.sleep(seconds)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Side note: Apparently, async() will be deprecated in 3.4.4.
# See: https://docs.python.org/3.4/library/asyncio-task.html#asyncio.async
tasks = asyncio.gather(
asyncio.async(shleepy_time(seconds=5)),
asyncio.async(shleepy_time(seconds=10))
)
try:
loop.run_until_complete(tasks)
except KeyboardInterrupt as e:
print("Caught keyboard interrupt. Canceling tasks...")
tasks.cancel()
loop.run_forever()
tasks.exception()
finally:
loop.close()
一旦捕获到 KeyboardInterrupt
,我们就会调用 tasks.cancel()
,然后再次启动 loop
。 run_forever
实际上会在 tasks
被取消后立即退出(请注意,取消 asyncio.gather
返回的 Future
也会取消其中的所有 Futures
),因为中断的 loop.run_until_complete
调用添加了一个 done_callback
到 tasks
来停止循环。因此,当我们取消 tasks
时,该回调将触发,循环停止。那时我们调用 tasks.exception
,只是为了避免收到关于未从 _GatheringFuture
.
Python 3.7+注意事项:下面是标准库 asyncio.run
函数的一部分 – 将下面替换为 sys.exit(loop.run(amain(loop)))
一旦你准备好升级! (如果您想打印消息,只需将 try…except
子句移至 amain
。)
更新 Python 3.6+:添加对 loop.shutdown_asyncgens
的调用以避免未完全使用的异步生成器造成内存泄漏。
受其他一些答案的启发,以下解决方案应该适用于几乎所有情况,并且不依赖于您手动跟踪需要在 Ctrl[=41 上清理的任务=]+C:
loop = asyncio.get_event_loop()
try:
# Here `amain(loop)` is the core coroutine that may spawn any
# number of tasks
sys.exit(loop.run_until_complete(amain(loop)))
except KeyboardInterrupt:
# Optionally show a message if the shutdown may take a while
print("Attempting graceful shutdown, press Ctrl+C again to exit…", flush=True)
# Do not show `asyncio.CancelledError` exceptions during shutdown
# (a lot of these may be generated, skip this if you prefer to see them)
def shutdown_exception_handler(loop, context):
if "exception" not in context \
or not isinstance(context["exception"], asyncio.CancelledError):
loop.default_exception_handler(context)
loop.set_exception_handler(shutdown_exception_handler)
# Handle shutdown gracefully by waiting for all tasks to be cancelled
tasks = asyncio.gather(*asyncio.Task.all_tasks(loop=loop), loop=loop, return_exceptions=True)
tasks.add_done_callback(lambda t: loop.stop())
tasks.cancel()
# Keep the event loop running until it is either destroyed or all
# tasks have really terminated
while not tasks.done() and not loop.is_closed():
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
以上代码将使用 asyncio.Task.all_tasks
从事件循环中获取所有当前任务,并使用 asyncio.gather
将它们放在一个组合的未来中。然后使用未来的 .cancel()
方法取消该未来的所有任务(当前都是 运行 任务)。 return_exceptions=True
然后确保所有收到的 asyncio.CancelledError
异常都被存储而不是导致未来变得错误。
以上代码还将覆盖默认的异常处理程序,以防止生成的 asyncio.CancelledError
异常被记录。
2020-12-17 更新:删除了 Python 3.5.
的兼容性代码在Python 3.7+中建议您使用asyncio.run
启动异步主函数。
asyncio.run
将负责为您的程序创建事件循环,并确保在主函数退出时关闭事件循环并清除所有任务(包括由于 KeyboardInterrupt
异常).
大致类比如下(见asyncio/runners.py
):
def run(coro, *, debug=False):
"""`asyncio.run` is new in Python 3.7"""
loop = asyncio.get_event_loop()
try:
loop.set_debug(debug)
return loop.run_until_complete(coro)
finally:
try:
all_tasks = asyncio.gather(*asyncio.all_tasks(loop), return_exceptions=True)
all_tasks.cancel()
with contextlib.suppress(asyncio.CancelledError):
loop.run_until_complete(all_tasks)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
使用 signal
模块在 signal.SIGINT
信号 (Ctrl + C) 上设置 asyncio.Event
可以是告诉所有异步代码自然停止的简洁方法。这一点尤其重要,因为一些库如 aiohttp
need a chance to be run cleanup tasks before the event loop closes.
这是一个使用 aiohttp
库的示例。有一个asyncio.sleep(5)
防止连接返回池,让用户有机会ctrl+c模拟一个KeyboardInterrupt
异常
示例代码:
import logging
import asyncio
import signal
import random
import aiohttp
logging.basicConfig(level="INFO", format="%(asctime)s %(threadName)-10s %(name)-10s %(levelname)-8s: %(message)s")
logger = logging.getLogger("root")
stop_event = asyncio.Event()
async def get_json(aiohttp_session):
logger.info("making http request")
params = {"value": random.randint(0,1000) }
async with aiohttp_session.get(f'https://httpbin.org/get', params=params) as response:
# async with response:
j = await response.json()
logger.info("get data: `%s`", j["args"])
await asyncio.sleep(5)
async def run():
while not stop_event.is_set():
async with aiohttp.ClientSession() as aiohttp_session:
await get_json(aiohttp_session)
logger.info("stop event was set, sleeping to let aiohttp close it's connections")
await asyncio.sleep(0.1)
logger.info("sleep finished, returning")
def inner_ctrl_c_signal_handler(sig, frame):
'''
function that gets called when the user issues a
keyboard interrupt (ctrl+c)
'''
logger.info("SIGINT caught!")
stop_event.set()
# experiment with commenting out this line and ctrl+c-ing the script
# to see how you get an "event loop is closed" error
signal.signal(signal.SIGINT, inner_ctrl_c_signal_handler)
asyncio.run(run())
没有 signal.signal
调用:
> python C:\Users\mark\Temp\test_aiohttp.py
2021-03-06 22:21:08,684 MainThread root INFO : making http request
2021-03-06 22:21:09,132 MainThread root INFO : get data: `{'value': '500'}`
Traceback (most recent call last):
File "C:\Users\auror\Temp\test_aiohttp.py", line 52, in <module>
asyncio.run(run())
File "c:\python39\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "c:\python39\lib\asyncio\base_events.py", line 629, in run_until_complete
self.run_forever()
File "c:\python39\lib\asyncio\windows_events.py", line 316, in run_forever
super().run_forever()
File "c:\python39\lib\asyncio\base_events.py", line 596, in run_forever
self._run_once()
File "c:\python39\lib\asyncio\base_events.py", line 1854, in _run_once
event_list = self._selector.select(timeout)
File "c:\python39\lib\asyncio\windows_events.py", line 434, in select
self._poll(timeout)
File "c:\python39\lib\asyncio\windows_events.py", line 783, in _poll
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
KeyboardInterrupt
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x000001CFFD75BB80>
Traceback (most recent call last):
File "c:\python39\lib\asyncio\proactor_events.py", line 116, in __del__
self.close()
File "c:\python39\lib\asyncio\proactor_events.py", line 108, in close
self._loop.call_soon(self._call_connection_lost, None)
File "c:\python39\lib\asyncio\base_events.py", line 746, in call_soon
self._check_closed()
File "c:\python39\lib\asyncio\base_events.py", line 510, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
有了它:
> python C:\Users\mark\Temp\test_aiohttp.py
2021-03-06 22:20:29,656 MainThread root INFO : making http request
2021-03-06 22:20:30,106 MainThread root INFO : get data: `{'value': '367'}`
2021-03-06 22:20:35,122 MainThread root INFO : making http request
2021-03-06 22:20:35,863 MainThread root INFO : get data: `{'value': '489'}`
2021-03-06 22:20:38,695 MainThread root INFO : SIGINT caught!
2021-03-06 22:20:40,867 MainThread root INFO : stop event was set, sleeping to let aiohttp close it's connections
2021-03-06 22:20:40,962 MainThread root INFO : sleep finished, returning