将 Ctrl + C 发送到 asyncio 任务

Send Ctrl + C to asyncio Task

所以我有两个任务 运行 直到在我的事件循环中完成。我想在 运行 那些任务时处理 KeyboardInterrupt 并在接收它时向事件循环中的任务发送相同的信号。

 loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(asyncio.gather(
            task_1(),
            task_2()
        ))
    except KeyboardInterrupt:
        pass
        # Here I want to send sigterm to the tasks in event loop

有什么办法吗?

改用asyncio.run()asyncio.run()接收异常时全部取消tasks/futures

asyncio.gather() 会在自己被取消时取消坦克。大家可以抓拍asyncio.CancelledError左右asyncio.sleep()看看。丢弃它会保留任务 运行.

async def task_1():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError as ex:
        print('task1', type(ex))
        raise


async def task_2():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError as ex:
        print('task2', type(ex))
        raise


async def main():
    await asyncio.gather(task_1(), task_2())


if __name__ == '__main__':
    asyncio.run(main())

否则,您需要保留对任务的引用,取消它们,然后 re-run 事件循环传播 CancelledError,就像 asynio.run() 在幕后所做的那样。