如何使 asyncio cancel() 真正取消任务?

How to make asyncio cancel() to actually cancel the task?

这段代码陷入死循环,self.task.cancel()似乎没有效果:

import asyncio

from unittest.mock import AsyncMock, patch


async def loop():
    while True:
        await asyncio.sleep(1)


class AsyncSleepMock(AsyncMock):
    def __init__(self):
        super(AsyncMock, self).__init__()
        self.task = None

    async def __call__(self, delay, *args, **kwargs):
        self.task.cancel()
        return await super(AsyncMock, self).__call__(delay, *args, **kwargs)


def create_async_sleep_mock():
    return AsyncSleepMock()


@patch("asyncio.sleep", new_callable=create_async_sleep_mock)
def main(async_sleep_mock):
    loop_task = asyncio.get_event_loop().create_task(loop())
    async_sleep_mock.task = loop_task
    asyncio.get_event_loop().run_until_complete(loop_task)


if __name__ == "__main__":
    main()

目标是制作一个模拟 asyncio.sleep(),可以从被测应用程序拥有的无限 loop() 中跳出来。怎么做?

self.task.cancel() 将任务标记为已取消,但此时这是 CPU 上的活动任务。必须发生任务切换以允许调度程序处理取消。

来自 cancel() 文档:

Request the Task to be cancelled.

This arranges for a CancelledError exception to be thrown into the wrapped coroutine on the next cycle of the event loop.

我插入了一个未模拟的 await asyncio.sleep(0) 以确保所需的任务切换,现在它不再循环:

realsleep = asyncio.sleep

class AsyncSleepMock(AsyncMock):
    def __init__(self):
        super(AsyncMock, self).__init__()
        self.task = None

    async def __call__(self, delay, *args, **kwargs):
        self.task.cancel()
        await realsleep(0)
        return await super(AsyncMock, self).__call__(delay, *args, **kwargs)

为了完整起见,我添加了 asyncio.sleep() 描述中的引述:

sleep() always suspends the current task, allowing other tasks to run.

Setting the delay to 0 provides an optimized path to allow other tasks to run. This can be used by long-running functions to avoid blocking the event loop for the full duration of the function call.