如果协程使用 asyncio 引发异常,如何关闭循环并打印错误?
How to shutdown the loop and print error if coroutine raised an exception with asyncio?
假设我有几个协程 运行 在一个循环中。如果其中一些程序因异常而失败,那么如何使整个程序因该异常而失败?因为现在 asyncio 甚至不打印协程的错误消息,除非我使用日志记录级别 "DEBUG".
from asyncio import get_event_loop, sleep
async def c(sleep_time=2, fail=False):
print('c', sleep_time, fail)
if fail:
raise Exception('fail')
while True:
print('doing stuff')
await sleep(sleep_time)
loop = get_event_loop()
loop.create_task(c(sleep_time=10, fail=False))
loop.create_task(c(fail=True))
loop.run_forever()
以下是您可能希望用来制定解决方案的一些注意事项:
检索协程异常(或结果!)的最简单方法是 await
。 asyncio.gather()
将从协程创建任务并将所有任务包装在一个包含任务中,如果其中一个子任务失败,该任务将失败:
import asyncio
import random
async def coro(n):
print("Start", n)
await asyncio.sleep(random.uniform(0.2, 0.5))
if n % 4 == 0:
raise Exception('fail ({})'.format(n))
return "OK: {}".format(n)
async def main():
tasks = [coro(i) for i in range(10)]
await asyncio.gather(*tasks)
print("done")
loop = asyncio.get_event_loop()
try:
asyncio.ensure_future(main())
loop.run_forever()
finally:
loop.close()
但这并没有关闭循环。要停止 运行 循环,请使用 loop.stop()
。改用这个:
async def main():
tasks = [coro(i) for i in range(10)]
try:
await asyncio.gather(*tasks)
except Exception as e:
loop.stop()
raise
print("done")
在某些长 运行 协程 运行 时停止循环可能不是您想要的。您可能希望首先使用事件向您的一些协程发出关闭信号:
import asyncio
import random
async def repeat(n):
print("start", n)
while not shutting_down.is_set():
print("repeat", n)
await asyncio.sleep(random.uniform(1, 3))
print("done", n)
async def main():
print("waiting 6 seconds..")
await asyncio.sleep(6)
print("shutting down")
shutting_down.set() # not a coroutine!
print("waiting")
await asyncio.wait(long_running)
print("done")
loop.stop()
loop = asyncio.get_event_loop()
shutting_down = asyncio.Event(loop=loop)
long_running = [loop.create_task(repeat(i + 1)) for i in range(5)]
try:
asyncio.ensure_future(main())
loop.run_forever()
finally:
loop.close()
如果您不想 await
执行您的任务,您可能需要使用 asyncio.Event
(或 asyncio.Queue
)来通知全局错误处理程序停止循环:
import asyncio
async def fail():
try:
print("doing stuff...")
await asyncio.sleep(0.2)
print("doing stuff...")
await asyncio.sleep(0.2)
print("doing stuff...")
raise Exception('fail')
except Exception as e:
error_event.payload = e
error_event.set()
raise # optional
async def error_handler():
await error_event.wait()
e = error_event.payload
print("Got:", e)
raise e
loop = asyncio.get_event_loop()
error_event = asyncio.Event()
try:
loop.create_task(fail())
loop.run_until_complete(error_handler())
finally:
loop.close()
(为简单起见,此处与 run_until_complete()
一起使用,但也可与 loop.stop()
一起使用)
好的,我找到了不需要重写任何现有代码的解决方案。它可能看起来很hacky,但我觉得我喜欢它。
因为我已经像这样抓住了 KeyboardInterrupt
。
def main(task=None):
task = task or start()
loop = get_event_loop()
try:
task = loop.create_task(task)
future = ensure_future(task)
loop.run_until_complete(task)
except KeyboardInterrupt:
print('\nperforming cleanup...')
task.cancel()
loop.run_until_complete(future)
loop.close()
sys.exit()
如何从协程向自身发送 KeyboardInterrupt
?我认为这会挂起应用程序,因为 os.kill
会等待应用程序关闭,并且因为它等待的应用程序是同一个应用程序,所以它会造成某种死锁,但幸运的是我错了。这段代码在退出前确实有效并打印 clean up
。
async def c(sleep_time=2, fail=False):
print('c', sleep_time, fail)
if fail:
await sleep(sleep_time)
os.kill(os.getpid(), signal.SIGINT)
while True:
print('doing stuff')
await sleep(sleep_time)
loop = get_event_loop()
loop.create_task(c(sleep_time=10, fail=False))
loop.create_task(c(fail=True))
try:
loop.run_forever()
except KeyboardInterrupt:
print('clean up')
loop.close()
sys.exit()
一种优雅的方法是使用错误处理 api。
https://docs.python.org/3/library/asyncio-eventloop.html#error-handling-api
示例:
import asyncio
async def run_division(a, b):
await asyncio.sleep(2)
return a / b
def custom_exception_handler(loop, context):
# first, handle with default handler
loop.default_exception_handler(context)
exception = context.get('exception')
if isinstance(exception, ZeroDivisionError):
print(context)
loop.stop()
loop = asyncio.get_event_loop()
# Set custom handler
loop.set_exception_handler(custom_exception_handler)
loop.create_task(run_division(1, 0))
loop.run_forever()
假设我有几个协程 运行 在一个循环中。如果其中一些程序因异常而失败,那么如何使整个程序因该异常而失败?因为现在 asyncio 甚至不打印协程的错误消息,除非我使用日志记录级别 "DEBUG".
from asyncio import get_event_loop, sleep
async def c(sleep_time=2, fail=False):
print('c', sleep_time, fail)
if fail:
raise Exception('fail')
while True:
print('doing stuff')
await sleep(sleep_time)
loop = get_event_loop()
loop.create_task(c(sleep_time=10, fail=False))
loop.create_task(c(fail=True))
loop.run_forever()
以下是您可能希望用来制定解决方案的一些注意事项:
检索协程异常(或结果!)的最简单方法是 await
。 asyncio.gather()
将从协程创建任务并将所有任务包装在一个包含任务中,如果其中一个子任务失败,该任务将失败:
import asyncio
import random
async def coro(n):
print("Start", n)
await asyncio.sleep(random.uniform(0.2, 0.5))
if n % 4 == 0:
raise Exception('fail ({})'.format(n))
return "OK: {}".format(n)
async def main():
tasks = [coro(i) for i in range(10)]
await asyncio.gather(*tasks)
print("done")
loop = asyncio.get_event_loop()
try:
asyncio.ensure_future(main())
loop.run_forever()
finally:
loop.close()
但这并没有关闭循环。要停止 运行 循环,请使用 loop.stop()
。改用这个:
async def main():
tasks = [coro(i) for i in range(10)]
try:
await asyncio.gather(*tasks)
except Exception as e:
loop.stop()
raise
print("done")
在某些长 运行 协程 运行 时停止循环可能不是您想要的。您可能希望首先使用事件向您的一些协程发出关闭信号:
import asyncio
import random
async def repeat(n):
print("start", n)
while not shutting_down.is_set():
print("repeat", n)
await asyncio.sleep(random.uniform(1, 3))
print("done", n)
async def main():
print("waiting 6 seconds..")
await asyncio.sleep(6)
print("shutting down")
shutting_down.set() # not a coroutine!
print("waiting")
await asyncio.wait(long_running)
print("done")
loop.stop()
loop = asyncio.get_event_loop()
shutting_down = asyncio.Event(loop=loop)
long_running = [loop.create_task(repeat(i + 1)) for i in range(5)]
try:
asyncio.ensure_future(main())
loop.run_forever()
finally:
loop.close()
如果您不想 await
执行您的任务,您可能需要使用 asyncio.Event
(或 asyncio.Queue
)来通知全局错误处理程序停止循环:
import asyncio
async def fail():
try:
print("doing stuff...")
await asyncio.sleep(0.2)
print("doing stuff...")
await asyncio.sleep(0.2)
print("doing stuff...")
raise Exception('fail')
except Exception as e:
error_event.payload = e
error_event.set()
raise # optional
async def error_handler():
await error_event.wait()
e = error_event.payload
print("Got:", e)
raise e
loop = asyncio.get_event_loop()
error_event = asyncio.Event()
try:
loop.create_task(fail())
loop.run_until_complete(error_handler())
finally:
loop.close()
(为简单起见,此处与 run_until_complete()
一起使用,但也可与 loop.stop()
一起使用)
好的,我找到了不需要重写任何现有代码的解决方案。它可能看起来很hacky,但我觉得我喜欢它。
因为我已经像这样抓住了 KeyboardInterrupt
。
def main(task=None):
task = task or start()
loop = get_event_loop()
try:
task = loop.create_task(task)
future = ensure_future(task)
loop.run_until_complete(task)
except KeyboardInterrupt:
print('\nperforming cleanup...')
task.cancel()
loop.run_until_complete(future)
loop.close()
sys.exit()
如何从协程向自身发送 KeyboardInterrupt
?我认为这会挂起应用程序,因为 os.kill
会等待应用程序关闭,并且因为它等待的应用程序是同一个应用程序,所以它会造成某种死锁,但幸运的是我错了。这段代码在退出前确实有效并打印 clean up
。
async def c(sleep_time=2, fail=False):
print('c', sleep_time, fail)
if fail:
await sleep(sleep_time)
os.kill(os.getpid(), signal.SIGINT)
while True:
print('doing stuff')
await sleep(sleep_time)
loop = get_event_loop()
loop.create_task(c(sleep_time=10, fail=False))
loop.create_task(c(fail=True))
try:
loop.run_forever()
except KeyboardInterrupt:
print('clean up')
loop.close()
sys.exit()
一种优雅的方法是使用错误处理 api。
https://docs.python.org/3/library/asyncio-eventloop.html#error-handling-api
示例:
import asyncio
async def run_division(a, b):
await asyncio.sleep(2)
return a / b
def custom_exception_handler(loop, context):
# first, handle with default handler
loop.default_exception_handler(context)
exception = context.get('exception')
if isinstance(exception, ZeroDivisionError):
print(context)
loop.stop()
loop = asyncio.get_event_loop()
# Set custom handler
loop.set_exception_handler(custom_exception_handler)
loop.create_task(run_division(1, 0))
loop.run_forever()