Python SQLAlchemy - 更新子模型时更新父模型的时间戳
Python SQLAlchemy - Update time stamp on parent model when child is updated
我创建了两个模型,Basket
和 BasketItem
,如下所示。一个篮子可以有很多篮子项目,所以我使用外键来创建这种关系。
您可以看到 Basket
模型有一个 updated_at
字段,其中包含最后一次更新篮子的时间戳。我希望在添加、删除或更新 BasketItem
时更新此字段。
我添加了一个事件侦听器,但我的解决方案似乎不是很优雅。有更好的方法吗?
class Basket(Base):
__tablename__ = "baskets"
id = Column(String(45), primary_key=True, default=uuid4)
shop = Column(String(45), nullable=False)
currency_code = Column(String(3), nullable=False)
total = Column(String(15), nullable=False, default=0)
status = Column(Enum("open", "closed"), nullable=False, default="open")
created_at = Column(DateTime, default=datetime.datetime.utcnow)
updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.utcnow)
items = relationship("BasketItem", backref="basket")
class BasketItem(Base):
__tablename__ = "basket_items"
basket_id = Column(String(45), ForeignKey('baskets.id'), primary_key=True)
product_url = Column(String(90), nullable=False, primary_key=True)
quantity = Column(Integer, nullable=False)
@event.listens_for(BasketItem, 'after_insert')
@event.listens_for(BasketItem, 'after_update')
@event.listens_for(BasketItem, 'after_delete')
def basket_item_change(mapper, connection, target):
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
basket = db_session.query(Basket).get(target.basket_id)
basket.updated_at = datetime.datetime.utcnow()
db_session.commit()
我认为使用事件侦听器很好,但您可以执行以下操作。当您添加一个新的 basketitem 时,我确定您首先查询父项,然后将这个子项与该父项一起添加。如果是那么:
basket = Basket.query.get(id)
if basket:
basket.update()
# you can perform delete or update too
item = BasketItem(basket=basket, ...)
db.session.add(item)
db.session.commit()
class Basket(db.Model):
...
def update(self):
self.updated_at = datetime.utcnow()
db.session.add(self)
db.session.commit()
我创建了两个模型,Basket
和 BasketItem
,如下所示。一个篮子可以有很多篮子项目,所以我使用外键来创建这种关系。
您可以看到 Basket
模型有一个 updated_at
字段,其中包含最后一次更新篮子的时间戳。我希望在添加、删除或更新 BasketItem
时更新此字段。
我添加了一个事件侦听器,但我的解决方案似乎不是很优雅。有更好的方法吗?
class Basket(Base):
__tablename__ = "baskets"
id = Column(String(45), primary_key=True, default=uuid4)
shop = Column(String(45), nullable=False)
currency_code = Column(String(3), nullable=False)
total = Column(String(15), nullable=False, default=0)
status = Column(Enum("open", "closed"), nullable=False, default="open")
created_at = Column(DateTime, default=datetime.datetime.utcnow)
updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.utcnow)
items = relationship("BasketItem", backref="basket")
class BasketItem(Base):
__tablename__ = "basket_items"
basket_id = Column(String(45), ForeignKey('baskets.id'), primary_key=True)
product_url = Column(String(90), nullable=False, primary_key=True)
quantity = Column(Integer, nullable=False)
@event.listens_for(BasketItem, 'after_insert')
@event.listens_for(BasketItem, 'after_update')
@event.listens_for(BasketItem, 'after_delete')
def basket_item_change(mapper, connection, target):
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
basket = db_session.query(Basket).get(target.basket_id)
basket.updated_at = datetime.datetime.utcnow()
db_session.commit()
我认为使用事件侦听器很好,但您可以执行以下操作。当您添加一个新的 basketitem 时,我确定您首先查询父项,然后将这个子项与该父项一起添加。如果是那么:
basket = Basket.query.get(id)
if basket:
basket.update()
# you can perform delete or update too
item = BasketItem(basket=basket, ...)
db.session.add(item)
db.session.commit()
class Basket(db.Model):
...
def update(self):
self.updated_at = datetime.utcnow()
db.session.add(self)
db.session.commit()