asyncio.sleep 取消任务后需要吗?
asyncio.sleep required after cancelling tasks?
为了测试中间人 tcp 代理,我编写了一个 echo tcp 服务器和一个 tcp 客户端。在每一项测试之后,我都希望代理和服务器关闭,以确保每项测试都在干净的环境中开始,因此我编写了代码:
class TestConnections(TestCase):
loop = None
@classmethod
def setUpClass(cls) -> None:
TestConnections.loop = asyncio.new_event_loop()
asyncio.set_event_loop(TestConnections.loop)
def setUp(self) -> None:
EnergyAgent.GP = GrowattParser.GrowattParser()
self.server = self.loop.create_task(tcp_server(1235))
self.gp = self.loop.create_task(EnergyAgentProxy(1234, 1235))
def tearDown(self) -> None:
logger.debug('Cancelling the tasks')
self.server.cancel()
self.gp.cancel()
self.loop.run_until_complete(asyncio.sleep(0.5))
@classmethod
def tearDownClass(cls) -> None:
logger.debug('Closing the event loop')
TestConnections.loop.close()
def test_client2server(self):
# start the client process, and wait until done
result = self.loop.run_until_complete(asyncio.wait_for(tcp_client(1235, message), timeout=2))
self.assertEqual(message, result)
现在,问题是:除非我在方法 tearDown 中添加最后一行
self.loop.run_until_complete(asyncio.sleep(0.5))
我收到有关代理上的任务尚未完成的错误消息:
ERROR:asyncio:Task was destroyed but it is pending!
有什么方法可以让所有的任务都完成吗?我试过了运行
self.loop.run_until_complete(asyncio.wait([self.server, self.gp]))
和asyncio.gather...未成功。
我假设正确的实例 tearDown
方法,class tearDownClass
被调用?
当您在任务完成之前关闭事件循环时,通常会出现该错误。当您取消任务时,它会立即 returns,但任务本身尚未取消,而是等待循环将其抛出 CancelledError
。所以你应该 await
让任务收到它的终止错误。如果你在任务有机会被抛出之前关闭事件循环,它不会被视为已取消,你会得到那个错误。
通过拨打 self.loop.run_until_complete(asyncio.sleep(0.5))
电话,您实际上是在无意中这样做的。
文档 elaborate about that
也结账
您正在手动管理您的事件循环,但没有等待计划任务在退出之前完成,并且同步拆卸永远不会自行调用事件循环。因此,当 Python 退出并强制删除它们时,任务仍然是 运行。
我建议使用 unittest.IsolatedAsyncioTestCase
而不是自己管理,这将在退出测试用例时适当地取消和等待 left-over 任务。
你上面的代码看起来像这样
class TestConnections(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self) -> None:
EnergyAgent.GP = GrowattParser.GrowattParser()
self.server = asyncio.create_task(tcp_server(1235))
self.gp = asyncio.create_task(EnergyAgentProxy(1234, 1235))
async def test_client2server(self):
# start the client process, and wait until done
result = await asyncio.wait_for(tcp_client(1235, message), timeout=2)
self.assertEqual(message, result)
如果为每个测试用例创建新的事件循环是一个问题,您可以在关闭之前自己清理事件循环,方法是通过 asyncio.all_tasks
获取所有任务,取消它们,等待它们完成它们通过 run_until_complete
执行(被取消或引发异常),然后 运行 loop.shutdown_asyncgens
.
为了测试中间人 tcp 代理,我编写了一个 echo tcp 服务器和一个 tcp 客户端。在每一项测试之后,我都希望代理和服务器关闭,以确保每项测试都在干净的环境中开始,因此我编写了代码:
class TestConnections(TestCase):
loop = None
@classmethod
def setUpClass(cls) -> None:
TestConnections.loop = asyncio.new_event_loop()
asyncio.set_event_loop(TestConnections.loop)
def setUp(self) -> None:
EnergyAgent.GP = GrowattParser.GrowattParser()
self.server = self.loop.create_task(tcp_server(1235))
self.gp = self.loop.create_task(EnergyAgentProxy(1234, 1235))
def tearDown(self) -> None:
logger.debug('Cancelling the tasks')
self.server.cancel()
self.gp.cancel()
self.loop.run_until_complete(asyncio.sleep(0.5))
@classmethod
def tearDownClass(cls) -> None:
logger.debug('Closing the event loop')
TestConnections.loop.close()
def test_client2server(self):
# start the client process, and wait until done
result = self.loop.run_until_complete(asyncio.wait_for(tcp_client(1235, message), timeout=2))
self.assertEqual(message, result)
现在,问题是:除非我在方法 tearDown 中添加最后一行
self.loop.run_until_complete(asyncio.sleep(0.5))
我收到有关代理上的任务尚未完成的错误消息:
ERROR:asyncio:Task was destroyed but it is pending!
有什么方法可以让所有的任务都完成吗?我试过了运行
self.loop.run_until_complete(asyncio.wait([self.server, self.gp]))
和asyncio.gather...未成功。
我假设正确的实例 tearDown
方法,class tearDownClass
被调用?
当您在任务完成之前关闭事件循环时,通常会出现该错误。当您取消任务时,它会立即 returns,但任务本身尚未取消,而是等待循环将其抛出 CancelledError
。所以你应该 await
让任务收到它的终止错误。如果你在任务有机会被抛出之前关闭事件循环,它不会被视为已取消,你会得到那个错误。
通过拨打 self.loop.run_until_complete(asyncio.sleep(0.5))
电话,您实际上是在无意中这样做的。
文档 elaborate about that
也结账
您正在手动管理您的事件循环,但没有等待计划任务在退出之前完成,并且同步拆卸永远不会自行调用事件循环。因此,当 Python 退出并强制删除它们时,任务仍然是 运行。
我建议使用 unittest.IsolatedAsyncioTestCase
而不是自己管理,这将在退出测试用例时适当地取消和等待 left-over 任务。
你上面的代码看起来像这样
class TestConnections(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self) -> None:
EnergyAgent.GP = GrowattParser.GrowattParser()
self.server = asyncio.create_task(tcp_server(1235))
self.gp = asyncio.create_task(EnergyAgentProxy(1234, 1235))
async def test_client2server(self):
# start the client process, and wait until done
result = await asyncio.wait_for(tcp_client(1235, message), timeout=2)
self.assertEqual(message, result)
如果为每个测试用例创建新的事件循环是一个问题,您可以在关闭之前自己清理事件循环,方法是通过 asyncio.all_tasks
获取所有任务,取消它们,等待它们完成它们通过 run_until_complete
执行(被取消或引发异常),然后 运行 loop.shutdown_asyncgens
.