pytest 问题与会话范围的夹具和异步
pytest issues with a session scoped fixture and asyncio
我有多个测试文件,每个文件都有一个如下所示的异步装置:
@pytest.fixture(scope="module")
def event_loop(request):
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="module")
async def some_fixture():
return await make_fixture()
我正在使用 xdist 进行并行化。另外我还有这个装饰器:
@toolz.curry
def throttle(limit, f):
semaphore = asyncio.Semaphore(limit)
@functools.wraps(f)
async def wrapped(*args, **kwargs):
async with semaphore:
return await f(*args, **kwargs)
return wrapped
我有一个函数使用它:
@throttle(10)
def f():
...
现在 f
正在从多个测试文件调用,我收到一个异常,告诉我我不能使用来自不同事件循环的信号量。
我尝试转向会话级事件循环装置:
@pytest.fixture(scope="session", autouse=True)
def event_loop(request):
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
但这只给了我:
ScopeMismatch: You tried to access the 'function' scoped fixture 'event_loop' with a 'module' scoped request object, involved factories
是否可以让 xdist + async fixture + semaphore 一起工作?
最终使用以下方法让它工作 conftest.py
:
import asyncio
import pytest
@pytest.fixture(scope="session")
def event_loop():
return asyncio.get_event_loop()
我有多个测试文件,每个文件都有一个如下所示的异步装置:
@pytest.fixture(scope="module")
def event_loop(request):
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="module")
async def some_fixture():
return await make_fixture()
我正在使用 xdist 进行并行化。另外我还有这个装饰器:
@toolz.curry
def throttle(limit, f):
semaphore = asyncio.Semaphore(limit)
@functools.wraps(f)
async def wrapped(*args, **kwargs):
async with semaphore:
return await f(*args, **kwargs)
return wrapped
我有一个函数使用它:
@throttle(10)
def f():
...
现在 f
正在从多个测试文件调用,我收到一个异常,告诉我我不能使用来自不同事件循环的信号量。
我尝试转向会话级事件循环装置:
@pytest.fixture(scope="session", autouse=True)
def event_loop(request):
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
但这只给了我:
ScopeMismatch: You tried to access the 'function' scoped fixture 'event_loop' with a 'module' scoped request object, involved factories
是否可以让 xdist + async fixture + semaphore 一起工作?
最终使用以下方法让它工作 conftest.py
:
import asyncio
import pytest
@pytest.fixture(scope="session")
def event_loop():
return asyncio.get_event_loop()