运行 tornado.testing.AsyncTestCase 使用异步事件循环

Run tornado.testing.AsyncTestCase using asyncio event loop

我有一个基于 asyncio 的 class,我想对其进行单元测试。使用 tornado.testing.AsyncTestCase 这非常有效且容易。但是,我的 class 的一个特定方法使用 asyncio.ensure_future 来安排另一个方法的执行。这永远不会在 AsyncTestCase 中完成,因为默认测试 运行ner 使用龙卷风 KQueueIOLoop 事件循环,而不是异步事件循环。

class TestSubject:
    def foo(self):
        asyncio.ensure_future(self.bar())

    async def bar(self):
        pass
class TestSubjectTest(AsyncTestCase):
    def test_foo(self):
        t = TestSubject()
        # here be somewhat involved setup with MagicMock and self.stop
        t.foo()
        self.wait()
$ python -m tornado.testing baz.testsubject_test
...
[E 160627 17:48:22 testing:731] FAIL
[E 160627 17:48:22 base_events:1090] Task was destroyed but it is pending!
    task: <Task pending coro=<TestSubject.bar() running at ...>>
.../asyncio/base_events.py:362: RuntimeWarning: coroutine 'TestSubject.bar' was never awaited

我如何使用不同的事件循环来 运行 测试以确保我的任务将实际执行?或者,如何使我的实现事件循环独立且交叉兼容?

事实证明很简单...

class TestSubjectTest(AsyncTestCase):
    def get_new_ioloop(self):  # override this method
        return tornado.platform.asyncio.AsyncIOMainLoop()

我之前也在尝试这个,但是直接返回了asyncio.get_event_loop(),没有成功。返回 Tornado 的 asyncio 循环包装器就可以了。