SQLAlchemy 将会话加入外部事务未按预期工作

SQLAlchemy Joining a Session into an External Transaction Not Working as Expected

我正在使用 pytest 为大型应用程序重写测试套件,并希望在每个测试函数之间进行隔离。我注意到,在 SAVEPOINT 中多次调用 commit 导致记录被输入到数据库中。我已经为以下示例提取了尽可能多的代码:

init.py

# Create the SQLAlchemy db instance
db: SQLAlchemy = SQLAlchemy(
    engine_options={"connect_args": {"options": "-c timezone=utc"}}
)
# Initialize Marshmallow
ma: Marshmallow = Marshmallow()

unleash = Unleash()

def create_app(config=None):
    # Create the Flask app
    app = Flask(__name__)

    # Flask is not autoloading FLASK_ENV anymore
    app.config.from_object("app.config.Testing")
    if not database_exists(app.config["SQLALCHEMY_DATABASE_URI"]):
        create_database(app.config["SQLALCHEMY_DATABASE_URI"])

    # initialize DB
    db.init_app(app)
    # Initialize Marshmallow
    ma.init_app(app)


    with app.app_context():
        # import models and setup blueprints
        ...

conftest.py

import os

import pytest
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import event
from sqlalchemy.orm import Session

import db_data
from app import create_app
from app import db as _db


@pytest.fixture(scope="session")
def app():
    """
    Returns session-wide application.
    """
    os.environ["FLASK_ENV"] = "testing"
    return create_app()


@pytest.fixture(scope="session")
def db(app: Flask, request):
    """
    Returns session-wide initialised database.
    """
    with app.app_context():
        _db.drop_all()
        _db.create_all()

        db_data.initialize_common_data(_db, app.logger)
        db_data.create_test_users(_db, app.logger)
        db_data.initialize_functional_test_data(_db, app.logger)

        yield _db

@pytest.fixture(scope="function", autouse=True)
def session(app: Flask, db: SQLAlchemy):
    """
    Returns function-scoped session.
    """
    # from https://docs.sqlalchemy.org/en/13/orm/session_transaction.html
    with app.app_context():
        connection = db.engine.connect()

        # begin a non-ORM transaction
        trans = connection.begin()

        # bind an individual Session to the connection
        sess = Session(bind=connection)

        # start the session in a SAVEPOINT...
        sess.begin_nested()

        # then each time that SAVEPOINT ends, reopen it
        @event.listens_for(sess, "after_transaction_end")
        def restart_savepoint(s, t):
            if t.nested and (t._parent is None or not t._parent.nested):
                s.expire_all()
                s.begin_nested()

        yield sess

        ### Cleanup ##
        # rollback - everything that happened with the Session above
        # (including calls to commit()) is rolled back.
        sess.close()
        trans.rollback()

        # return connection to the Engine
        connection.close()

conftestv2.py

@pytest.fixture(scope="function", autouse=True)
def session(app: Flask, db: SQLAlchemy):
    """
    Returns function-scoped session.
    """
    # from https://docs.sqlalchemy.org/en/13/orm/session_transaction.html
    with app.app_context():
        connection = db.engine.connect()

        # begin a non-ORM transaction
        trans = connection.begin()

        # start the session in a SAVEPOINT...
        db.session.begin_nested()

        # then each time that SAVEPOINT ends, reopen it
        @event.listens_for(db.session, "after_transaction_end")
        def restart_savepoint(s, t):
            if t.nested and (t._parent is None or not t._parent.nested):
                s.expire_all()
                s.begin_nested()

        # yield sess
        yield db.session

        ### Cleanup ##
        # rollback - everything that happened with the Session above
        # (including calls to commit()) is rolled back.
        db.session.close()
        trans.rollback()

        # return connection to the Engine
        connection.close()

test_user.py

from app.user.models import User
from app import db as _db

def test_list_trackers():
    print("SESSSS:", _db.session)
    _db.session.add(User(email="aaa@aaa.aaa"))
    _db.session.commit()

    _db.session.add(User(email="aaa@aaa.aaab"))
    _db.session.commit()

我尝试以此为指导:https://docs.sqlalchemy.org/en/13/orm/session_transaction.html#joining-a-session-into-an-external-transaction-such-as-for-test-suites

并查看了以下资源(以及许多其他资源):

我安装的相关包、版本:

$ pip list
Package                           Version
--------------------------------- ---------
coverage                          6.3.1
Faker                             12.1.0
Flask                             1.1.2
Flask-Cors                        3.0.10
Flask-Environments                0.1
Flask-HTTPAuth                    4.2.0
flask-marshmallow                 0.11.0
Flask-Migrate                     2.7.0
Flask-SQLAlchemy                  2.5.1
psycopg2-binary                   2.8.6
pytest                            6.2.3
pytest-cov                        3.0.0
SQLAlchemy                        1.3.18
SQLAlchemy-Paginator              0.2
sqlalchemy-stubs                  0.3
SQLAlchemy-Utils                  0.38.2
timezonefinder                    5.2

数据库在 docker-compose 中 运行 使用 postgres 12.2-alpine 图像。

在 SQLAlchemy 的 Gitter 社区的帮助下,我解决了这个问题。有两个问题需要解决:

  1. after_transaction_end 事件正在为每个单独的测试注册,但在测试结束后未被删除。因此,在每次测试之间调用了多个事件。
  2. db 固定装置生成的 _db 位于应用上下文中,它不应该在。

已更新conftest.py

@pytest.fixture(scope="session")
def db(app: Flask, request):
    """
    Returns session-wide initialised database.
    """
    with app.app_context():
        _db.drop_all()
        _db.create_all()

        db_data.initialize_common_data(_db, app.logger)
        db_data.create_test_users(_db, app.logger)
        db_data.initialize_functional_test_data(_db, app.logger)

        _db.session.close_all()

    yield _db


@pytest.fixture(scope="function", autouse=True)
def session(app: Flask, db: SQLAlchemy):
    """
    Returns function-scoped session.
    """
    # from https://docs.sqlalchemy.org/en/13/orm/session_transaction.html
    with app.app_context():
        connection = db.engine.connect()

        # begin a non-ORM transaction
        trans = connection.begin()

        # start the session in a SAVEPOINT...
        db.session.begin_nested()

        # then each time that SAVEPOINT ends, reopen it
        @event.listens_for(db.session, "after_transaction_end")
        def restart_savepoint(s, t):
            if t.nested and (t._parent is not None and not t._parent.nested):
                s.expire_all()
                s.begin_nested()

        # yield sess
        yield db.session

        ### Cleanup ##
        # rollback - everything that happened with the Session above
        # (including calls to commit()) is rolled back.
        event.remove(db.session, "after_transaction_end", restart_savepoint)

        db.session.close_all()
        trans.rollback()

        # return connection to the Engine
        connection.close()