在 Flask-SQLAlchemy 中隔离 py.test 个数据库会话
Isolating py.test DB sessions in Flask-SQLAlchemy
我正在尝试使用 Flask-SQLAlchemy 构建一个 Flask 应用程序;我使用 pytest 来测试数据库。问题之一似乎是在不同测试之间创建隔离的数据库会话。
我编写了一个最小的完整示例来突出问题,请注意 test_user_schema1()
和 test_user_schema2()
是相同的。
文件名:test_db.py
from models import User
def test_user_schema1(session):
person_name = 'Fran Clan'
uu = User(name=person_name)
session.add(uu)
session.commit()
assert uu.id==1
assert uu.name==person_name
def test_user_schema2(session):
person_name = 'Stan Clan'
uu = User(name=person_name)
session.add(uu)
session.commit()
assert uu.id==1
assert uu.name==person_name
如果数据库在我的测试之间确实是隔离的,那么两个测试都应该通过。然而,最后的测试总是失败,因为我还没有找到让数据库会话正确回滚的方法。
conftest.py
根据我在 Alex Michael's blog post 中看到的内容使用了以下代码,但是这个 fixture 代码中断了,因为它显然没有隔离 fixture 之间的数据库会话。
@pytest.yield_fixture(scope='function')
def session(app, db):
connection = db.engine.connect()
transaction = connection.begin()
#options = dict(bind=connection, binds={})
options = dict(bind=connection)
session = db.create_scoped_session(options=options)
yield session
# Finalize test here
transaction.rollback()
connection.close()
session.remove()
为了这个问题的目的,我构建了a gist,其中包含了你重现它所需要的一切;你可以用 git clone https://gist.github.com/34fa8d274fc4be240933.git
.
克隆它
我正在使用以下软件包...
Flask==0.10.1
Flask-Bootstrap==3.3.0.1
Flask-Migrate==1.3.0
Flask-Moment==0.4.0
Flask-RESTful==0.3.1
Flask-Script==2.0.5
Flask-SQLAlchemy==2.0
Flask-WTF==0.11
itsdangerous==0.24
pytest==2.6.4
Werkzeug==0.10.1
两个问题:
- 为什么现状被打破了?同样的 py.test fixture 似乎对其他人有用。
- 如何解决这个问题才能正常工作?
1.
根据Session Basics - SQLAlchemy documentation:
commit()
is used to commit the current transaction. It always issues flush() beforehand to flush any remaining state to the database; this is independent of the “autoflush” setting. ....
所以transaction.rollback()
在session中的fixture函数没有生效,因为事务已经提交了。
2.
将 fixture 的范围从 session
改为 function
以便每次都清除数据库。
@pytest.yield_fixture(scope='function')
def app(request):
...
@pytest.yield_fixture(scope='function')
def db(app, request):
...
顺便说一句,如果使用内存sqlite数据库,不需要删除db文件,速度会更快:
DB_URI = 'sqlite://' # SQLite :memory: database
...
@pytest.yield_fixture(scope='function')
def db(app, request):
_db.app = app
_db.create_all()
yield _db
_db.drop_all()
Alex Michael's blog post is not working because it's incomplete. According to the sqlalchemy documentation on joining sessions中介绍的方法,Alex的解决方案只有在没有回滚调用的情况下才有效。另一个区别是,与 Alex 博客上的作用域会话相比,sqla 文档中使用的是原始 Session
对象。
对于 flask-sqlalchemy,作用域会话会在 request teardown 上自动删除。调用 session.remove
,这会在后台发出回滚。要在测试范围内支持回滚,请使用 SAVEPOINT
:
import sqlalchemy as sa
@pytest.yield_fixture(scope='function')
def db_session(db):
"""
Creates a new database session for a test. Note you must use this fixture
if your test connects to db.
Here we not only support commit calls but also rollback calls in tests.
"""
connection = db.engine.connect()
transaction = connection.begin()
options = dict(bind=connection, binds={})
session = db.create_scoped_session(options=options)
session.begin_nested()
# session is actually a scoped_session
# for the `after_transaction_end` event, we need a session instance to
# listen for, hence the `session()` call
@sa.event.listens_for(session(), 'after_transaction_end')
def restart_savepoint(sess, trans):
if trans.nested and not trans._parent.nested:
session.expire_all()
session.begin_nested()
db.session = session
yield session
session.remove()
transaction.rollback()
connection.close()
不过您的数据库必须支持 SAVEPOINT
。
我正在尝试使用 Flask-SQLAlchemy 构建一个 Flask 应用程序;我使用 pytest 来测试数据库。问题之一似乎是在不同测试之间创建隔离的数据库会话。
我编写了一个最小的完整示例来突出问题,请注意 test_user_schema1()
和 test_user_schema2()
是相同的。
文件名:test_db.py
from models import User
def test_user_schema1(session):
person_name = 'Fran Clan'
uu = User(name=person_name)
session.add(uu)
session.commit()
assert uu.id==1
assert uu.name==person_name
def test_user_schema2(session):
person_name = 'Stan Clan'
uu = User(name=person_name)
session.add(uu)
session.commit()
assert uu.id==1
assert uu.name==person_name
如果数据库在我的测试之间确实是隔离的,那么两个测试都应该通过。然而,最后的测试总是失败,因为我还没有找到让数据库会话正确回滚的方法。
conftest.py
根据我在 Alex Michael's blog post 中看到的内容使用了以下代码,但是这个 fixture 代码中断了,因为它显然没有隔离 fixture 之间的数据库会话。
@pytest.yield_fixture(scope='function')
def session(app, db):
connection = db.engine.connect()
transaction = connection.begin()
#options = dict(bind=connection, binds={})
options = dict(bind=connection)
session = db.create_scoped_session(options=options)
yield session
# Finalize test here
transaction.rollback()
connection.close()
session.remove()
为了这个问题的目的,我构建了a gist,其中包含了你重现它所需要的一切;你可以用 git clone https://gist.github.com/34fa8d274fc4be240933.git
.
我正在使用以下软件包...
Flask==0.10.1
Flask-Bootstrap==3.3.0.1
Flask-Migrate==1.3.0
Flask-Moment==0.4.0
Flask-RESTful==0.3.1
Flask-Script==2.0.5
Flask-SQLAlchemy==2.0
Flask-WTF==0.11
itsdangerous==0.24
pytest==2.6.4
Werkzeug==0.10.1
两个问题:
- 为什么现状被打破了?同样的 py.test fixture 似乎对其他人有用。
- 如何解决这个问题才能正常工作?
1.
根据Session Basics - SQLAlchemy documentation:
commit()
is used to commit the current transaction. It always issues flush() beforehand to flush any remaining state to the database; this is independent of the “autoflush” setting. ....
所以transaction.rollback()
在session中的fixture函数没有生效,因为事务已经提交了。
2.
将 fixture 的范围从 session
改为 function
以便每次都清除数据库。
@pytest.yield_fixture(scope='function')
def app(request):
...
@pytest.yield_fixture(scope='function')
def db(app, request):
...
顺便说一句,如果使用内存sqlite数据库,不需要删除db文件,速度会更快:
DB_URI = 'sqlite://' # SQLite :memory: database
...
@pytest.yield_fixture(scope='function')
def db(app, request):
_db.app = app
_db.create_all()
yield _db
_db.drop_all()
Alex Michael's blog post is not working because it's incomplete. According to the sqlalchemy documentation on joining sessions中介绍的方法,Alex的解决方案只有在没有回滚调用的情况下才有效。另一个区别是,与 Alex 博客上的作用域会话相比,sqla 文档中使用的是原始 Session
对象。
对于 flask-sqlalchemy,作用域会话会在 request teardown 上自动删除。调用 session.remove
,这会在后台发出回滚。要在测试范围内支持回滚,请使用 SAVEPOINT
:
import sqlalchemy as sa
@pytest.yield_fixture(scope='function')
def db_session(db):
"""
Creates a new database session for a test. Note you must use this fixture
if your test connects to db.
Here we not only support commit calls but also rollback calls in tests.
"""
connection = db.engine.connect()
transaction = connection.begin()
options = dict(bind=connection, binds={})
session = db.create_scoped_session(options=options)
session.begin_nested()
# session is actually a scoped_session
# for the `after_transaction_end` event, we need a session instance to
# listen for, hence the `session()` call
@sa.event.listens_for(session(), 'after_transaction_end')
def restart_savepoint(sess, trans):
if trans.nested and not trans._parent.nested:
session.expire_all()
session.begin_nested()
db.session = session
yield session
session.remove()
transaction.rollback()
connection.close()
不过您的数据库必须支持 SAVEPOINT
。