从同步代码安排任务到 运行 事件循环

Schedule task to running event loop from synchronous code

考虑这个程序,其中的主循环和停止它的协程实际上是由我正在使用的库实现的。

import asyncio
import signal

running = True

async def stop():
    global running
    print("setting false")
    running = False
    await asyncio.sleep(3)
    print("reached end")

async def mainloop():
    while running:
        print("loop")
        await asyncio.sleep(1)

def handle_signal():
    loop.create_task(stop())

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, handle_signal)
loop.run_until_complete(mainloop())
loop.close()

当程序接收到信号时,我需要调用停止协程来停止主循环。虽然在使用 asyncio.BaseEventLoop.create_task 安排停止协程时,它首先停止主循环,这会停止事件循环,并且停止协程无法完成:

$ ./test.py 
loop
loop
loop
^Csetting false
Task was destroyed but it is pending!
task: <Task pending coro=<stop() done, defined at ./test.py:7> wait_for=<Future pending cb=[Task._wakeup()]>>

如何将协程添加到 运行 事件循环中,同时让事件循环等待完成?

正如您所发现的,问题是事件循环只等待 mainloop() 完成,留下 stop() 未决,asyncio 正确地抱怨。

如果 handle_signal 和顶级代码在您的控制之下,您可以轻松地用循环直到 mainloop 完成替换循环直到自定义协程完成。这个协程将调用 mainloop 然后等待清理代码完成:

# ... omitted definition of mainloop() and stop()

# list of tasks that must be waited for before we can actually exit
_cleanup = []

async def run():
    await mainloop()
    # wait for all _cleanup tasks to finish
    await asyncio.wait(_cleanup)

def handle_signal():
    # schedule stop() to run, and also add it to the list of
    # tasks run() must wait for before it is done
    _cleanup.append(loop.create_task(stop()))

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, handle_signal)
loop.run_until_complete(run())
loop.close()

另一个不需要新的 run() 协程(但仍然需要修改后的 handle_signal)的选项是在 mainloop 之后发出第二个 run_until_complete()完成:

# handle_signal and _cleanup defined as above

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, handle_signal)
loop.run_until_complete(mainloop())
if _cleanup:
    loop.run_until_complete(asyncio.wait(_cleanup))
loop.close()