使用工作单元模式时测试后的 Pytest Flask 回滚事务
Pytest Flask rollback transactions after tests when using the Unit of Work pattern
我正在研究“Cosmic Python”一书,第 6 章解释了如何使用工作单元模式来改变与 database/repository 的交互。
本书第 6 章可在此处访问:
https://www.cosmicpython.com/book/chapter_06_uow.html
作者提供的代码如下:
from __future__ import annotations
import abc
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.session import Session
from allocation import config
from allocation.adapters import repository
class AbstractUnitOfWork(abc.ABC):
products: repository.AbstractRepository
def __enter__(self) -> AbstractUnitOfWork:
return self
def __exit__(self, *args):
self.rollback()
@abc.abstractmethod
def commit(self):
raise NotImplementedError
@abc.abstractmethod
def rollback(self):
raise NotImplementedError
DEFAULT_SESSION_FACTORY = sessionmaker(bind=create_engine(
config.get_postgres_uri(),
isolation_level="REPEATABLE READ",
))
class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
def __init__(self, session_factory=DEFAULT_SESSION_FACTORY):
self.session_factory = session_factory
def __enter__(self):
self.session = self.session_factory() # type: Session
self.products = repository.SqlAlchemyRepository(self.session)
return super().__enter__()
def __exit__(self, *args):
super().__exit__(*args)
self.session.close()
def commit(self):
self.session.commit()
def rollback(self):
self.session.rollback()
我正在尝试在 Flask 上测试我的端点,但我无法让它回滚每次测试后插入的数据。
为了解决这个问题,我尝试安装包 pytest-flask-sqlalchemy
但出现以下错误:
'SqlAlchemyUnitOfWork' object has no attribute 'engine'
我不太明白 pytest-flask-sqlalchemy
是如何工作的,我不知道如何在测试后进行工作单元回滚事务。
是否可以按照作者实现的方式使其工作?
已编辑
可以通过以下存储库复制我的情况:
https://github.com/Santana94/CosmicPythonRollbackTest
通过克隆和 运行 make all
.
,您应该知道测试不会回滚以前的操作
最后,我必须在每次测试后实现回滚功能。
当我看到一个名为 pytest-postgresql
的包在其自身上实现它时,我开始工作了。我只是进行了调整以使测试回滚我正在使用的数据库数据。为此,我只需要在 conftest.py
:
上实现这个功能
@pytest.fixture(scope='function')
def db_session():
engine = create_engine(config.get_postgres_uri(), echo=False, poolclass=NullPool)
metadata.create_all(engine)
pyramid_basemodel.Session = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
pyramid_basemodel.bind_engine(
engine, pyramid_basemodel.Session, should_create=True, should_drop=True)
yield pyramid_basemodel.Session
transaction.commit()
metadata.drop_all(engine)
在那之后,如果我想回滚事务,我必须将 db_session
作为测试的参数:
@pytest.mark.usefixtures('postgres_db')
@pytest.mark.usefixtures('restart_api')
def test_happy_path_returns_202_and_batch_is_allocated(db_session):
orderid = random_orderid()
sku, othersku = random_sku(), random_sku('other')
earlybatch = random_batchref(1)
laterbatch = random_batchref(2)
otherbatch = random_batchref(3)
api_client.post_to_add_batch(laterbatch, sku, 100, '2011-01-02')
api_client.post_to_add_batch(earlybatch, sku, 100, '2011-01-01')
api_client.post_to_add_batch(otherbatch, othersku, 100, None)
r = api_client.post_to_allocate(orderid, sku, qty=3)
assert r.status_code == 202
r = api_client.get_allocation(orderid)
assert r.ok
assert r.json() == [
{'sku': sku, 'batchref': earlybatch},
]
可以在我的 GitHub 存储库中查看该实现的要求和其他方面的要求。
我正在研究“Cosmic Python”一书,第 6 章解释了如何使用工作单元模式来改变与 database/repository 的交互。
本书第 6 章可在此处访问: https://www.cosmicpython.com/book/chapter_06_uow.html
作者提供的代码如下:
from __future__ import annotations
import abc
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.session import Session
from allocation import config
from allocation.adapters import repository
class AbstractUnitOfWork(abc.ABC):
products: repository.AbstractRepository
def __enter__(self) -> AbstractUnitOfWork:
return self
def __exit__(self, *args):
self.rollback()
@abc.abstractmethod
def commit(self):
raise NotImplementedError
@abc.abstractmethod
def rollback(self):
raise NotImplementedError
DEFAULT_SESSION_FACTORY = sessionmaker(bind=create_engine(
config.get_postgres_uri(),
isolation_level="REPEATABLE READ",
))
class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
def __init__(self, session_factory=DEFAULT_SESSION_FACTORY):
self.session_factory = session_factory
def __enter__(self):
self.session = self.session_factory() # type: Session
self.products = repository.SqlAlchemyRepository(self.session)
return super().__enter__()
def __exit__(self, *args):
super().__exit__(*args)
self.session.close()
def commit(self):
self.session.commit()
def rollback(self):
self.session.rollback()
我正在尝试在 Flask 上测试我的端点,但我无法让它回滚每次测试后插入的数据。
为了解决这个问题,我尝试安装包 pytest-flask-sqlalchemy
但出现以下错误:
'SqlAlchemyUnitOfWork' object has no attribute 'engine'
我不太明白 pytest-flask-sqlalchemy
是如何工作的,我不知道如何在测试后进行工作单元回滚事务。
是否可以按照作者实现的方式使其工作?
已编辑
可以通过以下存储库复制我的情况:
https://github.com/Santana94/CosmicPythonRollbackTest
通过克隆和 运行 make all
.
最后,我必须在每次测试后实现回滚功能。
当我看到一个名为 pytest-postgresql
的包在其自身上实现它时,我开始工作了。我只是进行了调整以使测试回滚我正在使用的数据库数据。为此,我只需要在 conftest.py
:
@pytest.fixture(scope='function')
def db_session():
engine = create_engine(config.get_postgres_uri(), echo=False, poolclass=NullPool)
metadata.create_all(engine)
pyramid_basemodel.Session = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
pyramid_basemodel.bind_engine(
engine, pyramid_basemodel.Session, should_create=True, should_drop=True)
yield pyramid_basemodel.Session
transaction.commit()
metadata.drop_all(engine)
在那之后,如果我想回滚事务,我必须将 db_session
作为测试的参数:
@pytest.mark.usefixtures('postgres_db')
@pytest.mark.usefixtures('restart_api')
def test_happy_path_returns_202_and_batch_is_allocated(db_session):
orderid = random_orderid()
sku, othersku = random_sku(), random_sku('other')
earlybatch = random_batchref(1)
laterbatch = random_batchref(2)
otherbatch = random_batchref(3)
api_client.post_to_add_batch(laterbatch, sku, 100, '2011-01-02')
api_client.post_to_add_batch(earlybatch, sku, 100, '2011-01-01')
api_client.post_to_add_batch(otherbatch, othersku, 100, None)
r = api_client.post_to_allocate(orderid, sku, qty=3)
assert r.status_code == 202
r = api_client.get_allocation(orderid)
assert r.ok
assert r.json() == [
{'sku': sku, 'batchref': earlybatch},
]
可以在我的 GitHub 存储库中查看该实现的要求和其他方面的要求。