Python SQLALchemy 在编译查询的 where 子句中对列使用 pgp_sym_decrypt 而不是对变量使用 pgp_sym_encrypt

Python SQLALchemy use pgp_sym_decrypt for columns instead of pgp_sym_encrypt for variable in the where clause of the compiled query

我对每个 table 使用 class 模型,对每个 sql 操作使用 SQLAlchemy 的方法。

我安装 pgp_sym_encrypt 和 pgp_sym_decrypt 扩展 PostgreSQL

CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA my_database;

在 SQLAlchemy 文档的帮助下,我使用我的 class 用户,我在 pgp_sym_encrypt 的帮助下加密数据,PostgreSQL 的 pgcrypto 包的功能(我使用的版本13.4)

为了安全,我对用户的数据进行了加密我想使用postgreSQL的原生pgp_sym_encrypt和pgp_sym_decrypt原生函数。在我使用纯文本查询通过 psycopg2 执行此操作之前。由于我使用 SQLAlchemy ORM,通过它 Flask-SQLALchemy 扩展。

我找到了解决方案: https://docs.sqlalchemy.org/en/14/core/custom_types.html#applying-sql-level-bind-result-processing

但其中 None 显示了在查询中使用它们的示例

实际上我的代码如下:

import uuid
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, String, DateTime, func, type_coerce, TypeDecorator
from sqlalchemy.dialects.postgresql import UUID, BYTEA
from instance.config import ENCRYPTION_KEY

db = SQLAlchemy()


class PGPString(TypeDecorator):
    impl = BYTEA

    cache_ok = True

    def __init__(self, passphrase):
        super(PGPString, self).__init__()
        self.passphrase = passphrase

    def bind_expression(self, bindvalue):
        # convert the bind's type from PGPString to
        # String, so that it's passed to psycopg2 as is without
        # a dbapi.Binary wrapper
        bindvalue = type_coerce(bindvalue, String)
        return func.pgp_sym_encrypt(bindvalue, self.passphrase)

    def column_expression(self, col):
        return func.pgp_sym_decrypt(col, self.passphrase)
    
    
class User(db.Model):
    __tablename__ = "user"
    __table_args__ = {'schema': 'private'}
    id = Column('id', UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    sexe = Column("sexe", String)
    username = Column("username", PGPString(ENCRYPTION_KEY), unique=True)
    firstname = Column("firstname", PGPString(ENCRYPTION_KEY), unique=True)
    lastname = Column("lastname", PGPString(ENCRYPTION_KEY), unique=True)
    email = Column("email", PGPString(ENCRYPTION_KEY), unique=True)
    password = Column("password", PGPString(ENCRYPTION_KEY), unique=True)
    registration_date = Column("registration_date", DateTime)
    validation_date = Column("validation_date", DateTime)
    role = Column("role", String)

    @classmethod
    def create_user(cls, **kw):
        """
        Create an User
        """
        obj = cls(**kw)
        db.session.add(obj)
        db.session.commit()

一切正常,当我使用我的 create_user 方法时,用户数据已加密。

但是如果在我的 class 中添加一个方法来检查用户是否存在,通过此方法检查用户名、电子邮件或其他数据:

class User(db.Model)
    ....
    @classmethod
    def check_user_exist(cls, **kwargs):
        """
        Check if user exist
        :param kwargs:
        :return:
        """
        exists = db.session.query(cls).with_entities(cls.username).filter_by(**kwargs).first() is not None
        return exists

这将产生以下查询:

SELECT private."user".id AS private_user_id,
       private."user".sexe AS private_user_sexe,
       pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY)  AS private_user_username,
       pgp_sym_decrypt(private."user".firstname, ENCRYPTION_KEY) AS private_user_firstname,
       pgp_sym_decrypt(private."user".lastname, ENCRYPTION_KEY)  AS private_user_lastname,
       pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY)     AS private_user_email,
       pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY)  AS private_user_password,
       private."user".registration_date AS private_user_registration_date,
       private."user".validation_date   AS private_user_validation_date,
       private."user".role              AS private_user_role

FROM private."user";

WHERE private."user".username = pgp_sym_encrypt(username, ENCRYPTION_KEY)
  AND private."user".email = pgp_sym_encrypt(email, ENCRYPTION_KEY)
  AND private."user".password = pgp_sym_encrypt(password, ENCRYPTION_KEY);

这个查询 return None 因为当我们将 pgp_sym_encrypt 与实际的 ENCRYPTION_KEY 一起使用时,数据将以新的方式加密。

https://www.postgresql.org/docs/13/pgcrypto.html#id-1.11.7.34.8 https://en.wikipedia.org/wiki/Pretty_Good_Privacy#/media/File:PGP_diagram.svg

那么如何修改 SQLAlchemy 文档函数 PGPString(TypeDecorator): 以使用 pgp_sym_decrypt 解密列而不是加密变量,如下面的查询:

SELECT private."user".id AS private_user_id,
       private."user".sexe AS private_user_sexe,
       pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY)  AS private_user_username,
       pgp_sym_decrypt(private."user".firstname, ENCRYPTION_KEY) AS private_user_firstname,
       pgp_sym_decrypt(private."user".lastname, ENCRYPTION_KEY)  AS private_user_lastname,
       pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY)     AS private_user_email,
       pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY)  AS private_user_password,
       private."user".registration_date AS private_user_registration_date,
       private."user".validation_date   AS private_user_validation_date,
       private."user".role              AS private_user_role
FROM private."user"
WHERE
      pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY) = 'username'
    AND pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY)  = 'email'
    AND   pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY) = 'password';

通常我在 Python 和 SQLAlchemy 方面做得很好,但这确实超出了我的技能范围,而且我真的不知道如何修改这个函数,因为它影响了完全未知的功能我.

目前我使用文本功能,但这很难看

例如另一种从用户名(加密数据)获取 id 的方法

    @classmethod
    def get_id(cls, username):
        query = """
        SELECT private.user.id
        FROM private."user"
        WHERE pgp_sym_decrypt(private."user".username, :ENCRYPTION_KEY) = :username;
        """
        q = db.session.query(cls).from_statement(text(query)).params(ENCRYPTION_KEY=ENCRYPTION_KEY,
                                                                     username=username).first()
        return q.id

非常感谢您的帮助! 最好的问候

通过添加解决:

class PGPString(TypeDecorator):
    (...)
    def pgp_sym_decrypt(self, col):
        return func.pgp_sym_decrypt(col, self.passphrase, type_=String)

到 PGPString 并像这样使用:

class User(db.session):
    (...)
    @classmethod
    def check_user_exist(cls, **kw):
        exists = db.session.query(cls).with_entities(cls.username)\
            .filter(func.pgp_sym_decrypt(User.username, ENCRYPTION_KEY, type_=String) == kw['username'])\
            .filter(func.pgp_sym_decrypt(User.email, ENCRYPTION_KEY, type_=String) == kw['email'])\
            .filter(func.pgp_sym_decrypt(User.password, ENCRYPTION_KEY, type_=String) == kw['password'])\
            .first() is not None
    return exist

感谢 sqlalchemy 团队: https://github.com/sqlalchemy/sqlalchemy/discussions/7268