如何使用各种关闭机制从同步上下文关闭异步循环
How to shutdown an asyncio loop from a synchronous context with various shutdown mechanisms
我正在尝试提供一个同步关闭函数,它可以通过 SIGTERM
信号或 KeyboardInterrupt
SystemExit
异常优雅地终止 asyncio 应用程序,或者直接调用该函数由于糟糕的启动状态。我必须关闭各种任务,每个任务都有自己的关闭方式:
- 一个 aiohttp
AppRunner
当前通过 shutdown
方法终止,returns 一个需要等待的协程
- 一个 asyncio
APScheduler
当前通过 shutdown
方法终止,该方法在当前事件循环中调用 call_soon_threadsafe
- 一个永远运行的简单异步循环当前通过任务
上的 cancel
信号终止
- aiohttp
ClientSession
通过会话中的 close
方法取消
我想终止消息处理器并忽略任何传入的新消息,调度程序但允许完成当前 运行 依赖于 aiohttp ClientSession
[=30 的任何任务=]
这是当前代码的缩写和一些注释以阐明逻辑:
message_processor_future = loop.create_task(message_processor())
def sig_term_handler(_, __):
logging.info("SIGTERM received, shutting down server...")
shutdown_server(
http_runner=http_runner,
scheduler=scheduler,
message_processor_future=message_processor_future
)
signal.signal(signal.SIGTERM, sig_term_handler)
try:
loop.run_until_complete(message_processor_future)
except (KeyboardInterrupt, SystemExit) as e:
logging.info("{} received".format(e.__class__.__name__))
shutdown_server(
http_runner=http_runner,
scheduler=scheduler,
message_processor_future=message_processor_future
)
async def message_processor():
while True:
try:
# code
except CancelledError:
logging.info("Cancelling message processing...")
return
def shutdown_server(
http_runner: AppRunner = None,
scheduler: AsyncIOScheduler = None,
message_processor_future: asyncio.Task = None
):
loop = asyncio.get_event_loop()
# Try to shutdown to the message processor as early as possible so we don't get any new messages
if message_processor_future:
logging.info("Cancelling message processor...")
message_processor_future.cancel()
# Shutdown apscheduler early to make sure we don't schedule any new tasks
if scheduler:
logging.info("Shutting down scheduler...")
scheduler.shutdown()
# if the server is running then kill it (this doesn't really have any requirements as it's fairly separate from the application)
if http_runner:
logging.info("Shutting down http server...")
loop.run_until_complete(http_runner.cleanup())
logging.info(
f"Waiting for {len(asyncio.Task.all_tasks(loop))} to complete..."
)
# wait for any tasks spawned by apscheduler to finish and the message processor to die if it's still running
loop.run_until_complete(
asyncio.wait(asyncio.Task.all_tasks(loop), timeout=10)
)
logging.info("Closing ingest api client...")
from collector.tasks.ap_associations import api_client
# Kill the client session as the tasks that use ClientSession have completed
loop.run_until_complete(api_client.session.close())
logging.info("Shutting down process...")
exit(0)
当我通过 KeyboardInterrupt
或 SystemExit
取消应用程序时,它会毫无问题地进行清理,这是因为我相信循环已经停止 运行 所以调用 loop.run_until_complete
是安全和同步的,但是当收到 SIGTERM
时,循环仍然是 运行 所以我得到这个异常
[2019-06-03 14:52:26,985] [ INFO] --- Shutting down http server...
[2019-06-03 14:52:26,985] [ ERROR] --- Exception in callback Loop._read_from_self
handle: <Handle Loop._read_from_self>
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 67, in uvloop.loop.Handle._run
File "uvloop/loop.pyx", line 324, in uvloop.loop.Loop._read_from_self
File "uvloop/loop.pyx", line 329, in uvloop.loop.Loop._invoke_signals
File "uvloop/loop.pyx", line 304, in uvloop.loop.Loop._ceval_process_signals
File "/opt/collector/collector/__main__.py", line 144, in sig_term_handler
message_processor_future=message_processor_future
File "/opt/collector/collector/__main__.py", line 192, in shutdown_server
loop.run_until_complete(http_runner.cleanup())
File "uvloop/loop.pyx", line 1440, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1433, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1342, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 445, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.
这是有道理的,但我不确定如何设置关闭方法来处理这种状态,我尝试使用 add_done_callback
方法,但这似乎也不起作用,因为应用程序卡在 while 循环中等待所有任务完成或被取消。
def shutdown_server(
http_runner: AppRunner = None,
scheduler: AsyncIOScheduler = None,
message_processor_future: asyncio.Task = None
):
loop = asyncio.get_event_loop()
if loop.is_running():
task_runner = loop.create_task
else:
task_runner = loop.run_until_complete
if message_processor_future:
logging.info("Cancelling message processor...")
message_processor_future.cancel()
if scheduler:
logging.info("Shutting down scheduler...")
scheduler.shutdown()
if http_runner:
logging.info("Shutting down http server...")
task_runner(http_runner.shutdown())
logging.info(
f"Waiting for {len(asyncio.Task.all_tasks(loop))} to complete..."
)
def finish_shutdown():
task_runner(http_runner.cleanup())
logging.info("Closing ingest api client...")
from collector.tasks.ap_associations import api_client
task_runner(api_client.session.close())
logging.info("Shutting down process...")
exit(0)
if loop.is_running():
all_tasks_complete = loop.create_task(asyncio.wait(
asyncio.Task.all_tasks(loop), timeout=10
))
all_tasks_complete.add_done_callback(finish_shutdown)
while not all_tasks_complete.done() and not all_tasks_complete.cancelled():
pass
else:
loop.run_until_complete(asyncio.wait(
asyncio.Task.all_tasks(loop), timeout=10
))
finish_shutdown()
我意识到你可以在信号处理程序中调用 sys.exit,循环将收到一个 SystemExit 异常并继续执行 catch 子句的其余部分并停止循环。
即
signal.signal(signal.SIGTERM, lambda _, __: sys.exit(0))
这让我可以重构代码,使其更清晰,我还可以强制任务使用这种模式处理它们自己的异常:
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit) as e:
logging.info(f"{e.__class__.__name__} received")
except Exception as e:
exception_manager.handle_exception(e)
finally:
shutdown(http_server_manager, scheduler)
我正在尝试提供一个同步关闭函数,它可以通过 SIGTERM
信号或 KeyboardInterrupt
SystemExit
异常优雅地终止 asyncio 应用程序,或者直接调用该函数由于糟糕的启动状态。我必须关闭各种任务,每个任务都有自己的关闭方式:
- 一个 aiohttp
AppRunner
当前通过shutdown
方法终止,returns 一个需要等待的协程 - 一个 asyncio
APScheduler
当前通过shutdown
方法终止,该方法在当前事件循环中调用call_soon_threadsafe
- 一个永远运行的简单异步循环当前通过任务 上的
- aiohttp
ClientSession
通过会话中的close
方法取消
cancel
信号终止
我想终止消息处理器并忽略任何传入的新消息,调度程序但允许完成当前 运行 依赖于 aiohttp ClientSession
[=30 的任何任务=]
这是当前代码的缩写和一些注释以阐明逻辑:
message_processor_future = loop.create_task(message_processor())
def sig_term_handler(_, __):
logging.info("SIGTERM received, shutting down server...")
shutdown_server(
http_runner=http_runner,
scheduler=scheduler,
message_processor_future=message_processor_future
)
signal.signal(signal.SIGTERM, sig_term_handler)
try:
loop.run_until_complete(message_processor_future)
except (KeyboardInterrupt, SystemExit) as e:
logging.info("{} received".format(e.__class__.__name__))
shutdown_server(
http_runner=http_runner,
scheduler=scheduler,
message_processor_future=message_processor_future
)
async def message_processor():
while True:
try:
# code
except CancelledError:
logging.info("Cancelling message processing...")
return
def shutdown_server(
http_runner: AppRunner = None,
scheduler: AsyncIOScheduler = None,
message_processor_future: asyncio.Task = None
):
loop = asyncio.get_event_loop()
# Try to shutdown to the message processor as early as possible so we don't get any new messages
if message_processor_future:
logging.info("Cancelling message processor...")
message_processor_future.cancel()
# Shutdown apscheduler early to make sure we don't schedule any new tasks
if scheduler:
logging.info("Shutting down scheduler...")
scheduler.shutdown()
# if the server is running then kill it (this doesn't really have any requirements as it's fairly separate from the application)
if http_runner:
logging.info("Shutting down http server...")
loop.run_until_complete(http_runner.cleanup())
logging.info(
f"Waiting for {len(asyncio.Task.all_tasks(loop))} to complete..."
)
# wait for any tasks spawned by apscheduler to finish and the message processor to die if it's still running
loop.run_until_complete(
asyncio.wait(asyncio.Task.all_tasks(loop), timeout=10)
)
logging.info("Closing ingest api client...")
from collector.tasks.ap_associations import api_client
# Kill the client session as the tasks that use ClientSession have completed
loop.run_until_complete(api_client.session.close())
logging.info("Shutting down process...")
exit(0)
当我通过 KeyboardInterrupt
或 SystemExit
取消应用程序时,它会毫无问题地进行清理,这是因为我相信循环已经停止 运行 所以调用 loop.run_until_complete
是安全和同步的,但是当收到 SIGTERM
时,循环仍然是 运行 所以我得到这个异常
[2019-06-03 14:52:26,985] [ INFO] --- Shutting down http server...
[2019-06-03 14:52:26,985] [ ERROR] --- Exception in callback Loop._read_from_self
handle: <Handle Loop._read_from_self>
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 67, in uvloop.loop.Handle._run
File "uvloop/loop.pyx", line 324, in uvloop.loop.Loop._read_from_self
File "uvloop/loop.pyx", line 329, in uvloop.loop.Loop._invoke_signals
File "uvloop/loop.pyx", line 304, in uvloop.loop.Loop._ceval_process_signals
File "/opt/collector/collector/__main__.py", line 144, in sig_term_handler
message_processor_future=message_processor_future
File "/opt/collector/collector/__main__.py", line 192, in shutdown_server
loop.run_until_complete(http_runner.cleanup())
File "uvloop/loop.pyx", line 1440, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1433, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1342, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 445, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.
这是有道理的,但我不确定如何设置关闭方法来处理这种状态,我尝试使用 add_done_callback
方法,但这似乎也不起作用,因为应用程序卡在 while 循环中等待所有任务完成或被取消。
def shutdown_server(
http_runner: AppRunner = None,
scheduler: AsyncIOScheduler = None,
message_processor_future: asyncio.Task = None
):
loop = asyncio.get_event_loop()
if loop.is_running():
task_runner = loop.create_task
else:
task_runner = loop.run_until_complete
if message_processor_future:
logging.info("Cancelling message processor...")
message_processor_future.cancel()
if scheduler:
logging.info("Shutting down scheduler...")
scheduler.shutdown()
if http_runner:
logging.info("Shutting down http server...")
task_runner(http_runner.shutdown())
logging.info(
f"Waiting for {len(asyncio.Task.all_tasks(loop))} to complete..."
)
def finish_shutdown():
task_runner(http_runner.cleanup())
logging.info("Closing ingest api client...")
from collector.tasks.ap_associations import api_client
task_runner(api_client.session.close())
logging.info("Shutting down process...")
exit(0)
if loop.is_running():
all_tasks_complete = loop.create_task(asyncio.wait(
asyncio.Task.all_tasks(loop), timeout=10
))
all_tasks_complete.add_done_callback(finish_shutdown)
while not all_tasks_complete.done() and not all_tasks_complete.cancelled():
pass
else:
loop.run_until_complete(asyncio.wait(
asyncio.Task.all_tasks(loop), timeout=10
))
finish_shutdown()
我意识到你可以在信号处理程序中调用 sys.exit,循环将收到一个 SystemExit 异常并继续执行 catch 子句的其余部分并停止循环。
即
signal.signal(signal.SIGTERM, lambda _, __: sys.exit(0))
这让我可以重构代码,使其更清晰,我还可以强制任务使用这种模式处理它们自己的异常:
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit) as e:
logging.info(f"{e.__class__.__name__} received")
except Exception as e:
exception_manager.handle_exception(e)
finally:
shutdown(http_server_manager, scheduler)