无法执行操作:pytest 中正在进行另一个操作
cannot perform operation: another operation is in progress in pytest
我想测试一些与 asyncpg
一起工作的功能。如果我 运行 一次测试一个,它工作正常。但是如果我一次 运行 多个测试,除了第一个测试之外的所有测试都会崩溃并显示错误 asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
.
测试:
@pytest.mark.asyncio
async def test_project_connection(superuser_id, project_id):
data = element_data_random(project_id)
element_id = (await resolve_element_create(data=data, user_id=superuser_id))["id"]
project_elements = (await db_projects_element_ids_get([project_id]))[project_id]
assert element_id in project_elements
@pytest.mark.asyncio
async def test_project_does_not_exist(superuser_id):
data = element_data_random(str(uuid.uuid4()))
with pytest.raises(ObjectWithIdDoesNotExistError):
await resolve_element_create(data=data, user_id=superuser_id)
使用数据库使用池的所有函数如下所示:
async def <some_db_func>(*args):
pool = await get_pool()
await pool.execute(...) # or fetch/fetchrow/fetchval
我如何获得池:
db_pool = None
async def get_pool():
global db_pool
async def init(con):
await con.set_type_codec('jsonb', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog')
await con.set_type_codec('json', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog')
if not db_pool:
dockerfiles_dir = os.path.join(src_dir, 'dockerfiles')
env_path = os.path.join(dockerfiles_dir, 'dev.env')
try:
# When code and DB inside docker containers
host = 'postgres-docker'
socket.gethostbyname(host)
except socket.error:
# When code on localhost, but DB inside docker container
host = 'localhost'
load_dotenv(dotenv_path=env_path)
db_pool = await asyncpg.create_pool(
database=os.getenv("POSTGRES_DBNAME"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
host=host,
init=init
)
return db_pool
据我了解,如果您 运行 通过池请求,asynсpg
会创建一个新连接并 运行 在该连接内发送请求。这清楚地表明每个请求都应该有自己的连接。但是出现这个错误,是一个连接试图同时处理两个请求时导致的
好的,多亏了@Adelin,我意识到我需要 运行 每个同步的异步测试。我是 asyncio
的新手,所以我没有马上理解并找到了解决方案。
是:
@pytest.mark.asyncio
async def test_...(*args):
result = await <some_async_func>
assert result == excepted_result
变成:
def test_...(*args):
async def inner()
result = await <some_async_func>
assert result == excepted_result
asyncio.get_event_loop().run_until_complete(inner())
我想测试一些与 asyncpg
一起工作的功能。如果我 运行 一次测试一个,它工作正常。但是如果我一次 运行 多个测试,除了第一个测试之外的所有测试都会崩溃并显示错误 asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
.
测试:
@pytest.mark.asyncio
async def test_project_connection(superuser_id, project_id):
data = element_data_random(project_id)
element_id = (await resolve_element_create(data=data, user_id=superuser_id))["id"]
project_elements = (await db_projects_element_ids_get([project_id]))[project_id]
assert element_id in project_elements
@pytest.mark.asyncio
async def test_project_does_not_exist(superuser_id):
data = element_data_random(str(uuid.uuid4()))
with pytest.raises(ObjectWithIdDoesNotExistError):
await resolve_element_create(data=data, user_id=superuser_id)
使用数据库使用池的所有函数如下所示:
async def <some_db_func>(*args):
pool = await get_pool()
await pool.execute(...) # or fetch/fetchrow/fetchval
我如何获得池:
db_pool = None
async def get_pool():
global db_pool
async def init(con):
await con.set_type_codec('jsonb', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog')
await con.set_type_codec('json', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog')
if not db_pool:
dockerfiles_dir = os.path.join(src_dir, 'dockerfiles')
env_path = os.path.join(dockerfiles_dir, 'dev.env')
try:
# When code and DB inside docker containers
host = 'postgres-docker'
socket.gethostbyname(host)
except socket.error:
# When code on localhost, but DB inside docker container
host = 'localhost'
load_dotenv(dotenv_path=env_path)
db_pool = await asyncpg.create_pool(
database=os.getenv("POSTGRES_DBNAME"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
host=host,
init=init
)
return db_pool
据我了解,如果您 运行 通过池请求,asynсpg
会创建一个新连接并 运行 在该连接内发送请求。这清楚地表明每个请求都应该有自己的连接。但是出现这个错误,是一个连接试图同时处理两个请求时导致的
好的,多亏了@Adelin,我意识到我需要 运行 每个同步的异步测试。我是 asyncio
的新手,所以我没有马上理解并找到了解决方案。
是:
@pytest.mark.asyncio
async def test_...(*args):
result = await <some_async_func>
assert result == excepted_result
变成:
def test_...(*args):
async def inner()
result = await <some_async_func>
assert result == excepted_result
asyncio.get_event_loop().run_until_complete(inner())