如何使 asyncio cancel() 真正取消任务?
How to make asyncio cancel() to actually cancel the task?
这段代码陷入死循环,self.task.cancel()
似乎没有效果:
import asyncio
from unittest.mock import AsyncMock, patch
async def loop():
while True:
await asyncio.sleep(1)
class AsyncSleepMock(AsyncMock):
def __init__(self):
super(AsyncMock, self).__init__()
self.task = None
async def __call__(self, delay, *args, **kwargs):
self.task.cancel()
return await super(AsyncMock, self).__call__(delay, *args, **kwargs)
def create_async_sleep_mock():
return AsyncSleepMock()
@patch("asyncio.sleep", new_callable=create_async_sleep_mock)
def main(async_sleep_mock):
loop_task = asyncio.get_event_loop().create_task(loop())
async_sleep_mock.task = loop_task
asyncio.get_event_loop().run_until_complete(loop_task)
if __name__ == "__main__":
main()
目标是制作一个模拟 asyncio.sleep()
,可以从被测应用程序拥有的无限 loop()
中跳出来。怎么做?
self.task.cancel()
将任务标记为已取消,但此时这是 CPU 上的活动任务。必须发生任务切换以允许调度程序处理取消。
来自 cancel() 文档:
Request the Task to be cancelled.
This arranges for a CancelledError exception to be thrown into the
wrapped coroutine on the next cycle of the event loop.
我插入了一个未模拟的 await asyncio.sleep(0)
以确保所需的任务切换,现在它不再循环:
realsleep = asyncio.sleep
class AsyncSleepMock(AsyncMock):
def __init__(self):
super(AsyncMock, self).__init__()
self.task = None
async def __call__(self, delay, *args, **kwargs):
self.task.cancel()
await realsleep(0)
return await super(AsyncMock, self).__call__(delay, *args, **kwargs)
为了完整起见,我添加了 asyncio.sleep() 描述中的引述:
sleep() always suspends the current task, allowing other tasks to run.
Setting the delay to 0 provides an optimized path to allow other tasks
to run. This can be used by long-running functions to avoid blocking
the event loop for the full duration of the function call.
这段代码陷入死循环,self.task.cancel()
似乎没有效果:
import asyncio
from unittest.mock import AsyncMock, patch
async def loop():
while True:
await asyncio.sleep(1)
class AsyncSleepMock(AsyncMock):
def __init__(self):
super(AsyncMock, self).__init__()
self.task = None
async def __call__(self, delay, *args, **kwargs):
self.task.cancel()
return await super(AsyncMock, self).__call__(delay, *args, **kwargs)
def create_async_sleep_mock():
return AsyncSleepMock()
@patch("asyncio.sleep", new_callable=create_async_sleep_mock)
def main(async_sleep_mock):
loop_task = asyncio.get_event_loop().create_task(loop())
async_sleep_mock.task = loop_task
asyncio.get_event_loop().run_until_complete(loop_task)
if __name__ == "__main__":
main()
目标是制作一个模拟 asyncio.sleep()
,可以从被测应用程序拥有的无限 loop()
中跳出来。怎么做?
self.task.cancel()
将任务标记为已取消,但此时这是 CPU 上的活动任务。必须发生任务切换以允许调度程序处理取消。
来自 cancel() 文档:
Request the Task to be cancelled.
This arranges for a CancelledError exception to be thrown into the wrapped coroutine on the next cycle of the event loop.
我插入了一个未模拟的 await asyncio.sleep(0)
以确保所需的任务切换,现在它不再循环:
realsleep = asyncio.sleep
class AsyncSleepMock(AsyncMock):
def __init__(self):
super(AsyncMock, self).__init__()
self.task = None
async def __call__(self, delay, *args, **kwargs):
self.task.cancel()
await realsleep(0)
return await super(AsyncMock, self).__call__(delay, *args, **kwargs)
为了完整起见,我添加了 asyncio.sleep() 描述中的引述:
sleep() always suspends the current task, allowing other tasks to run.
Setting the delay to 0 provides an optimized path to allow other tasks to run. This can be used by long-running functions to avoid blocking the event loop for the full duration of the function call.