py.test 混合 fixtures 和 asyncio 协程
py.test mixing fixtures and asyncio coroutines
我正在使用 py.test 为 python3 代码构建一些测试。该代码使用 aiopg(基于 Asyncio 的 postgres 接口)访问 Postgresql 数据库。
我的主要期望:
每个测试用例都应该可以访问新的异步事件循环。
运行时间过长的测试将因超时异常而停止。
每个测试用例都应该可以访问数据库连接。
我不想在写测试用例的时候重复自己。
使用 py.test fixtures 我可以非常接近我想要的东西,但我仍然需要在每个异步测试用例中重复一下自己。
我的代码是这样的:
@pytest.fixture(scope='function')
def tloop(request):
# This fixture is responsible for getting a new event loop
# for every test, and close it when the test ends.
...
def run_timeout(cor,loop,timeout=ASYNC_TEST_TIMEOUT):
"""
Run a given coroutine with timeout.
"""
task_with_timeout = asyncio.wait_for(cor,timeout)
try:
loop.run_until_complete(task_with_timeout)
except futures.TimeoutError:
# Timeout:
raise ExceptAsyncTestTimeout()
@pytest.fixture(scope='module')
def clean_test_db(request):
# Empty the test database.
...
@pytest.fixture(scope='function')
def udb(request,clean_test_db,tloop):
# Obtain a connection to the database using aiopg
# (That's why we need tloop here).
...
# An example for a test:
def test_insert_user(tloop,udb):
@asyncio.coroutine
def insert_user():
# Do user insertion here ...
yield from udb.insert_new_user(...
...
run_timeout(insert_user(),tloop)
我可以接受目前的解决方案,但定义内部协程并为我编写的每个异步测试添加 run_timeout 行会变得很麻烦。
我希望我的测试看起来像这样:
@some_magic_decorator
def test_insert_user(udb):
# Do user insertion here ...
yield from udb.insert_new_user(...
...
我试图以某种优雅的方式创建这样的装饰器,但失败了。更一般地说,如果我的测试看起来像:
@some_magic_decorator
def my_test(arg1,arg2,...,arg_n):
...
那么生成的函数(应用装饰器之后)应该是:
def my_test_wrapper(tloop,arg1,arg2,...,arg_n):
run_timeout(my_test(),tloop)
请注意,我的一些测试使用其他固定装置(例如 udb 除外),并且这些固定装置必须显示为生成函数的参数,否则 py.test 将不会调用它们。
我尝试同时使用 wrapt and decorator python 模块来创建这样一个神奇的装饰器,但似乎这两个模块都帮助我创建了一个签名与 my_test 相同的函数,在这种情况下这不是一个好的解决方案。
这可能可以使用 eval 或类似的 hack 来解决,但我想知道我是否遗漏了一些优雅的东西。
Every test case should have access to a new asyncio event loop.
asyncio的测试套件使用unittest.TestCase。它使用 setUp() 方法创建一个新的事件循环。 addCleanup(loop.close) 会自动关闭事件循环,即使出现错误也是如此。
抱歉,如果您不想使用 TestCase,我不知道如何使用 py.test 编写此代码。不过没记错的话,py.test支持unittest.TestCase.
A test that runs too long will stop with a timeout exception.
您可以将 loop.call_later() 与引发 BaseException 的函数一起用作看门狗。
我目前正在尝试解决类似的问题。这是我到目前为止的想法。它似乎有效,但需要一些清理:
# tests/test_foo.py
import asyncio
@asyncio.coroutine
def test_coro(loop):
yield from asyncio.sleep(0.1)
assert 0
# tests/conftest.py
import asyncio
@pytest.yield_fixture
def loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()
def pytest_pycollect_makeitem(collector, name, obj):
"""Collect asyncio coroutines as normal functions, not as generators."""
if asyncio.iscoroutinefunction(obj):
return list(collector._genfunctions(name, obj))
def pytest_pyfunc_call(pyfuncitem):
"""If ``pyfuncitem.obj`` is an asyncio coroutinefunction, execute it via
the event loop instead of calling it directly."""
testfunction = pyfuncitem.obj
if not asyncio.iscoroutinefunction(testfunction):
return
# Copied from _pytest/python.py:pytest_pyfunc_call()
funcargs = pyfuncitem.funcargs
testargs = {}
for arg in pyfuncitem._fixtureinfo.argnames:
testargs[arg] = funcargs[arg]
coro = testfunction(**testargs) # Will no execute the test yet!
# Run the coro in the event loop
loop = testargs.get('loop', asyncio.get_event_loop())
loop.run_until_complete(coro)
return True # TODO: What to return here?
所以我基本上让pytest像普通函数一样收集asyncio协程。我还拦截函数的文本执行。如果待测函数是协程,我在事件循环中执行。无论有没有夹具,它都可以为每个测试创建一个新的事件循环实例。
编辑: 根据 Ronny Pfannschmidt 的说法,像这样的东西将在 2.7 版本后添加到 pytest 中。 :-)
我正在使用 py.test 为 python3 代码构建一些测试。该代码使用 aiopg(基于 Asyncio 的 postgres 接口)访问 Postgresql 数据库。
我的主要期望:
每个测试用例都应该可以访问新的异步事件循环。
运行时间过长的测试将因超时异常而停止。
每个测试用例都应该可以访问数据库连接。
我不想在写测试用例的时候重复自己。
使用 py.test fixtures 我可以非常接近我想要的东西,但我仍然需要在每个异步测试用例中重复一下自己。
我的代码是这样的:
@pytest.fixture(scope='function')
def tloop(request):
# This fixture is responsible for getting a new event loop
# for every test, and close it when the test ends.
...
def run_timeout(cor,loop,timeout=ASYNC_TEST_TIMEOUT):
"""
Run a given coroutine with timeout.
"""
task_with_timeout = asyncio.wait_for(cor,timeout)
try:
loop.run_until_complete(task_with_timeout)
except futures.TimeoutError:
# Timeout:
raise ExceptAsyncTestTimeout()
@pytest.fixture(scope='module')
def clean_test_db(request):
# Empty the test database.
...
@pytest.fixture(scope='function')
def udb(request,clean_test_db,tloop):
# Obtain a connection to the database using aiopg
# (That's why we need tloop here).
...
# An example for a test:
def test_insert_user(tloop,udb):
@asyncio.coroutine
def insert_user():
# Do user insertion here ...
yield from udb.insert_new_user(...
...
run_timeout(insert_user(),tloop)
我可以接受目前的解决方案,但定义内部协程并为我编写的每个异步测试添加 run_timeout 行会变得很麻烦。
我希望我的测试看起来像这样:
@some_magic_decorator
def test_insert_user(udb):
# Do user insertion here ...
yield from udb.insert_new_user(...
...
我试图以某种优雅的方式创建这样的装饰器,但失败了。更一般地说,如果我的测试看起来像:
@some_magic_decorator
def my_test(arg1,arg2,...,arg_n):
...
那么生成的函数(应用装饰器之后)应该是:
def my_test_wrapper(tloop,arg1,arg2,...,arg_n):
run_timeout(my_test(),tloop)
请注意,我的一些测试使用其他固定装置(例如 udb 除外),并且这些固定装置必须显示为生成函数的参数,否则 py.test 将不会调用它们。
我尝试同时使用 wrapt and decorator python 模块来创建这样一个神奇的装饰器,但似乎这两个模块都帮助我创建了一个签名与 my_test 相同的函数,在这种情况下这不是一个好的解决方案。
这可能可以使用 eval 或类似的 hack 来解决,但我想知道我是否遗漏了一些优雅的东西。
Every test case should have access to a new asyncio event loop.
asyncio的测试套件使用unittest.TestCase。它使用 setUp() 方法创建一个新的事件循环。 addCleanup(loop.close) 会自动关闭事件循环,即使出现错误也是如此。
抱歉,如果您不想使用 TestCase,我不知道如何使用 py.test 编写此代码。不过没记错的话,py.test支持unittest.TestCase.
A test that runs too long will stop with a timeout exception.
您可以将 loop.call_later() 与引发 BaseException 的函数一起用作看门狗。
我目前正在尝试解决类似的问题。这是我到目前为止的想法。它似乎有效,但需要一些清理:
# tests/test_foo.py
import asyncio
@asyncio.coroutine
def test_coro(loop):
yield from asyncio.sleep(0.1)
assert 0
# tests/conftest.py
import asyncio
@pytest.yield_fixture
def loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()
def pytest_pycollect_makeitem(collector, name, obj):
"""Collect asyncio coroutines as normal functions, not as generators."""
if asyncio.iscoroutinefunction(obj):
return list(collector._genfunctions(name, obj))
def pytest_pyfunc_call(pyfuncitem):
"""If ``pyfuncitem.obj`` is an asyncio coroutinefunction, execute it via
the event loop instead of calling it directly."""
testfunction = pyfuncitem.obj
if not asyncio.iscoroutinefunction(testfunction):
return
# Copied from _pytest/python.py:pytest_pyfunc_call()
funcargs = pyfuncitem.funcargs
testargs = {}
for arg in pyfuncitem._fixtureinfo.argnames:
testargs[arg] = funcargs[arg]
coro = testfunction(**testargs) # Will no execute the test yet!
# Run the coro in the event loop
loop = testargs.get('loop', asyncio.get_event_loop())
loop.run_until_complete(coro)
return True # TODO: What to return here?
所以我基本上让pytest像普通函数一样收集asyncio协程。我还拦截函数的文本执行。如果待测函数是协程,我在事件循环中执行。无论有没有夹具,它都可以为每个测试创建一个新的事件循环实例。
编辑: 根据 Ronny Pfannschmidt 的说法,像这样的东西将在 2.7 版本后添加到 pytest 中。 :-)