在 Flask SQLAlchemy 中自动更新其他表

Updating other Tables Automatically in Flask SQLAlchemy

我正在学习 Flask/SQLAlchemy 并且正在尝试创建一个简单的数据库。某些 table 可以根据其他 table 进行计算,但我不太确定 where/how 添加该逻辑。

作为一个简化的问题,假设这个数据库中有三个table(*是主键,**是外键):

User: | id* | name | username | password |
Transactions: | id* | user_id** | date | quantity | value |
Holdings: | id* | user_id** | date | holdings |

然后我使用 flask-adminTransactions table 中手动添加一个条目。假设我们添加了条目: user_id = 1, date = '5/31/2022', quantity = 10, value = .

这里的目标是当添加这个条目时,Flask 知道通过计算一些 Python 函数自动将另一个条目添加到 Holdings table。假设 Holdings.holdings = Transactions.quantity * Transactions.value,目标是自动添加以下条目:user_id = 1, date = '5/31/2022', holdings = 0Holdings table.

关于如何做到这一点有什么建议吗?

在代码中:

from flask import Flask
from flask_login import UserMixin

app = Flask(__name__)
# Flask boilerplate code, register blueprints, etc

db = SQLAlchemy()

# Database Models

class User(db.Model, UserMixin):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(128), nullable=False)
    username = db.Column(db.String(128), unique=True, nullable=False)
    password = db.Column(db.String(128), nullable=False)

class Transactions(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
    user = db.relationship('User')

    date = db.Column(db.DateTime(timezone=True), nullable=False)
    transaction = db.Column(db.Float())

class Holdings(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
    user = db.relationship('User')

    date = db.Column(db.DateTime(timezone=True), nullable=False)
    holdings = db.Column(db.Float())


db.init_app(app)


更新 这主要是基于@jorzel 回答的逻辑,有一些变化,因为我忘了提到我正在使用 flask_sqlalchemy.

@event.listens_for(Transactions, 'after_insert')
def UpdateHoldings(mapper, connection, transaction):

    # Adding new entry
    holdings_val = transaction.quantity * transaction.value
    holdings = Holdings(user_id = transaction.user_id,
                        date = transaction.date,
                        holdings = holdings_val)
    db.session.add( holdings )

    # (Not mentioned in OP, but if we want to update some table)
    # ( And assuming this table does not have date field )
    another_tbl = AnotherTable.__table__
    connection.execute(
        another_tbl.update().where(another_tbl.c.user_id == 
        transaction.user_id).values( holdings = holdings_val )
    )

注意:出于某种原因,db.session.commit 会引发 This session is closed 错误。但是不包括它似乎有效。

我准备了一个使用 sqlalchemy 事件 after_insert 处理的简单示例。

from datetime import datetime
from sqlalchemy import event, MetaData
from sqlalchemy.orm import sessionmaker
from tutorial.models import Transaction, Holding, User

DB_URI = "sqlite://"

engine = create_engine(DB_URI)
metadata = MetaData()
Base = declarative_base(metadata=metadata)


@contextmanager
def transaction_scope(session):
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise


def get_session(bind):
    Session = sessionmaker(bind)
    return Session()


@event.listens_for(Transaction, 'after_insert')
def receive_after_insert(mapper, connection, target):
    with get_session(connection) as session:
        with transaction_scope(session):
            transaction = target
            holding = Holding(
                user_id=transaction.user_id, 
                holdings=transaction.value * transaction.quantity, 
                date=transaction.date
            )
            session.add(holding)

def add_user(session) -> int:
    with transaction_scope(session):
        user = User(name='Test', username='test', password='secret')
        session.add(user)
        session.flush()
    return user.id

def tutorial_after_insert(with_raise=False):
    with get_session(engine) as session:
        user_id = add_user(session)
        with transaction_scope(session):
            transaction = Transaction(
                quantity=10,
                value=11.2,
                user_id=user_id,
                date=datetime.utcnow()
            )
            session.add(transaction)
            session.flush()
            if with_raise:
                raise Exception('exception')

try:
    tutorial_after_insert(with_raise=False)
except Exception as e:
    print(e)
finally:
    with get_session(engine) as session:
        print(session.query(Holding).all())
        print(session.query(Transaction).all())

如果没有抛出异常,你会得到如下结果:

[Holding(id=1, user_id=1, value=112.0, date=2022-06-01 08:27:26.307473)]
[Transaction(id=1, user_id=1, quantity=10, value=11.2, date=2022-06-01 08:27:26.307473)]

如果出现异常,则不会添加任何记录。

我已添加到此 repo 中的类似示例:https://github.com/jorzel/sqlalchemy-events-tutorial