asyncio.sleep 取消任务后需要吗?

asyncio.sleep required after cancelling tasks?

为了测试中间人 tcp 代理,我编写了一个 echo tcp 服务器和一个 tcp 客户端。在每一项测试之后,我都希望代理和服务器关闭,以确保每项测试都在干净的环境中开始,因此我编写了代码:

class TestConnections(TestCase):

    loop = None

    @classmethod
    def setUpClass(cls) -> None:
        TestConnections.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(TestConnections.loop)

    def setUp(self) -> None:
        EnergyAgent.GP = GrowattParser.GrowattParser()
        self.server = self.loop.create_task(tcp_server(1235))
        self.gp = self.loop.create_task(EnergyAgentProxy(1234, 1235))

    def tearDown(self) -> None:
        logger.debug('Cancelling the tasks')
        self.server.cancel()
        self.gp.cancel()
        self.loop.run_until_complete(asyncio.sleep(0.5))

    @classmethod
    def tearDownClass(cls) -> None:
        logger.debug('Closing the event loop')
        TestConnections.loop.close()

    def test_client2server(self):
        # start the client process, and wait until done
        result = self.loop.run_until_complete(asyncio.wait_for(tcp_client(1235, message), timeout=2))
        self.assertEqual(message, result)

现在,问题是:除非我在方法 tearDown 中添加最后一行

self.loop.run_until_complete(asyncio.sleep(0.5))

我收到有关代理上的任务尚未完成的错误消息:

ERROR:asyncio:Task was destroyed but it is pending!

有什么方法可以让所有的任务都完成吗?我试过了运行

self.loop.run_until_complete(asyncio.wait([self.server, self.gp]))

和asyncio.gather...未成功。

我假设正确的实例 tearDown 方法,class tearDownClass 被调用?

当您在任务完成之前关闭事件循环时,通常会出现该错误。当您取消任务时,它会立即 returns,但任务本身尚未取消,而是等待循环将其抛出 CancelledError。所以你应该 await 让任务收到它的终止错误。如果你在任务有机会被抛出之前关闭事件循环,它不会被视为已取消,你会得到那个错误。

通过拨打 self.loop.run_until_complete(asyncio.sleep(0.5)) 电话,您实际上是在无意中这样做的。

文档 elaborate about that 也结账

您正在手动管理您的事件循环,但没有等待计划任务在退出之前完成,并且同步拆卸永远不会自行调用事件循环。因此,当 Python 退出并强制删除它们时,任务仍然是 运行。

我建议使用 unittest.IsolatedAsyncioTestCase 而不是自己管理,这将在退出测试用例时适当地取消和等待 left-over 任务。

你上面的代码看起来像这样

class TestConnections(unittest.IsolatedAsyncioTestCase):

    async def asyncSetUp(self) -> None:
        EnergyAgent.GP = GrowattParser.GrowattParser()
        self.server = asyncio.create_task(tcp_server(1235))
        self.gp = asyncio.create_task(EnergyAgentProxy(1234, 1235))

    async def test_client2server(self):
        # start the client process, and wait until done
        result = await asyncio.wait_for(tcp_client(1235, message), timeout=2)
        self.assertEqual(message, result)

如果为每个测试用例创建新的事件循环是一个问题,您可以在关闭之前自己清理事件循环,方法是通过 asyncio.all_tasks 获取所有任务,取消它们,等待它们完成它们通过 run_until_complete 执行(被取消或引发异常),然后 运行 loop.shutdown_asyncgens.