Flask-Sqlalchemy 并行请求避免竞争条件

Flask-Sqlachlemy parallel requests avoid raise condition

我使用具有两个个人资料图像的用户的一对多模型。 这个想法是在存储了两个图像后发送一封电子邮件。 为此,两个上传调用从前端并行发送到后端。

我遇到的问题是,这可能会导致存储两个图像但图像计数检查失败的情况,因为每个调用都在其自己的请求范围内运行 session/transaction。

我现在的问题是如何使用 Flask-Sqlalchemy 对并行执行的烧瓶请求可靠地执行图像计数检查

class User(Mixins, db.Model):
    __tablename__ = 'user'
    
    first_name = db.Column(db.String(1000), unique=False, nullable=True)
    surname = db.Column(db.String(1000), unique=False, nullable=True)
    email = db.Column(db.String(120, collation='NOCASE'), unique=True, index=True, nullable=False)
    
    images: {} = db.relationship('ProfileImage',
                                 back_populates="user",
                                 cascade='all,delete-orphan')

class ProfileImage(Mixins, db.Model):
    __tablename__ = 'profile_image'
    file_name = db.Column(db.String, unique=False, index=False, nullable=True)
    mime_type = db.Column(db.String, nullable=False)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
    user = db.relationship("User", back_populates="images")

def file_upload(request, user_id: int, side: str) -> int:
    try:

        file = image_service.get_image_from_request(request, side)
        if file and is_valid_mime_type(file.get('mime_type', None)):
            user = user_service.load_user(user_id)
            image = ProfileImage(side=side,
                                 file_name=file.get('name', None),
                                 mime_type='image/jpg',
                                 created_at=datetime.utcnow(),
                                 )
            user.images.append(image)
            db.session.add(user)
            db.session.commit()
            if len(user.images) ==2:
                send_email(user)
            return CREATED
    except ValueError as e:
        current_app.logger.error(repr(e))
    return BAD_REQUEST

如果可能的话,我会尽量避免这种检查。请注意,他们也有可能 发送一封电子邮件,这有时比根本不发送任何电子邮件更糟糕。

... 但是强制同步的一种方法是使用 ROW 锁。您不能锁定正在插入的行,但可以锁定用户的行。然后一个图像创建任务被迫等待另一个任务完成。这可能很难跟踪,而且您不想到处都被锁定,因此您应该将此作为最后的手段,但 SELECT FOR UPDATE 的工作方式如下:

# User's table row is locked until session is commited or rolled back.
user = session.query(User).filter(User.id == current_user_id).with_for_update().first()
# Or session.query(User).get(current_user_id, with_for_update=True)

# Manipulate the user's profile images.
user.images.append(new_profile_image)

# I'd use a direct query in case the user.images are not accurate for some reason.
image_count = session.query(func.count(ProfileImage.id)).filter(ProfileImage.user == user).scalar()
if image_count == 2:
    send_email(user)
# Release user row lock and commit changes.
session.commit()

这会强制一个作者等待另一个作者提交。了解 postgresql select for update row locking and sqlalchemy's with_for_update interface.