Python SQLALchemy 在编译查询的 where 子句中对列使用 pgp_sym_decrypt 而不是对变量使用 pgp_sym_encrypt
Python SQLALchemy use pgp_sym_decrypt for columns instead of pgp_sym_encrypt for variable in the where clause of the compiled query
我对每个 table 使用 class 模型,对每个 sql 操作使用 SQLAlchemy 的方法。
我安装 pgp_sym_encrypt 和 pgp_sym_decrypt 扩展 PostgreSQL
CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA my_database;
在 SQLAlchemy 文档的帮助下,我使用我的 class 用户,我在 pgp_sym_encrypt 的帮助下加密数据,PostgreSQL 的 pgcrypto 包的功能(我使用的版本13.4)
为了安全,我对用户的数据进行了加密我想使用postgreSQL的原生pgp_sym_encrypt和pgp_sym_decrypt原生函数。在我使用纯文本查询通过 psycopg2 执行此操作之前。由于我使用 SQLAlchemy ORM,通过它 Flask-SQLALchemy 扩展。
我找到了解决方案:
https://docs.sqlalchemy.org/en/14/core/custom_types.html#applying-sql-level-bind-result-processing
但其中 None 显示了在查询中使用它们的示例
实际上我的代码如下:
import uuid
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, String, DateTime, func, type_coerce, TypeDecorator
from sqlalchemy.dialects.postgresql import UUID, BYTEA
from instance.config import ENCRYPTION_KEY
db = SQLAlchemy()
class PGPString(TypeDecorator):
impl = BYTEA
cache_ok = True
def __init__(self, passphrase):
super(PGPString, self).__init__()
self.passphrase = passphrase
def bind_expression(self, bindvalue):
# convert the bind's type from PGPString to
# String, so that it's passed to psycopg2 as is without
# a dbapi.Binary wrapper
bindvalue = type_coerce(bindvalue, String)
return func.pgp_sym_encrypt(bindvalue, self.passphrase)
def column_expression(self, col):
return func.pgp_sym_decrypt(col, self.passphrase)
class User(db.Model):
__tablename__ = "user"
__table_args__ = {'schema': 'private'}
id = Column('id', UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
sexe = Column("sexe", String)
username = Column("username", PGPString(ENCRYPTION_KEY), unique=True)
firstname = Column("firstname", PGPString(ENCRYPTION_KEY), unique=True)
lastname = Column("lastname", PGPString(ENCRYPTION_KEY), unique=True)
email = Column("email", PGPString(ENCRYPTION_KEY), unique=True)
password = Column("password", PGPString(ENCRYPTION_KEY), unique=True)
registration_date = Column("registration_date", DateTime)
validation_date = Column("validation_date", DateTime)
role = Column("role", String)
@classmethod
def create_user(cls, **kw):
"""
Create an User
"""
obj = cls(**kw)
db.session.add(obj)
db.session.commit()
一切正常,当我使用我的 create_user 方法时,用户数据已加密。
但是如果在我的 class 中添加一个方法来检查用户是否存在,通过此方法检查用户名、电子邮件或其他数据:
class User(db.Model)
....
@classmethod
def check_user_exist(cls, **kwargs):
"""
Check if user exist
:param kwargs:
:return:
"""
exists = db.session.query(cls).with_entities(cls.username).filter_by(**kwargs).first() is not None
return exists
这将产生以下查询:
SELECT private."user".id AS private_user_id,
private."user".sexe AS private_user_sexe,
pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY) AS private_user_username,
pgp_sym_decrypt(private."user".firstname, ENCRYPTION_KEY) AS private_user_firstname,
pgp_sym_decrypt(private."user".lastname, ENCRYPTION_KEY) AS private_user_lastname,
pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY) AS private_user_email,
pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY) AS private_user_password,
private."user".registration_date AS private_user_registration_date,
private."user".validation_date AS private_user_validation_date,
private."user".role AS private_user_role
FROM private."user";
WHERE private."user".username = pgp_sym_encrypt(username, ENCRYPTION_KEY)
AND private."user".email = pgp_sym_encrypt(email, ENCRYPTION_KEY)
AND private."user".password = pgp_sym_encrypt(password, ENCRYPTION_KEY);
这个查询 return None
因为当我们将 pgp_sym_encrypt 与实际的 ENCRYPTION_KEY 一起使用时,数据将以新的方式加密。
https://www.postgresql.org/docs/13/pgcrypto.html#id-1.11.7.34.8
https://en.wikipedia.org/wiki/Pretty_Good_Privacy#/media/File:PGP_diagram.svg
那么如何修改 SQLAlchemy 文档函数 PGPString(TypeDecorator):
以使用 pgp_sym_decrypt 解密列而不是加密变量,如下面的查询:
SELECT private."user".id AS private_user_id,
private."user".sexe AS private_user_sexe,
pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY) AS private_user_username,
pgp_sym_decrypt(private."user".firstname, ENCRYPTION_KEY) AS private_user_firstname,
pgp_sym_decrypt(private."user".lastname, ENCRYPTION_KEY) AS private_user_lastname,
pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY) AS private_user_email,
pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY) AS private_user_password,
private."user".registration_date AS private_user_registration_date,
private."user".validation_date AS private_user_validation_date,
private."user".role AS private_user_role
FROM private."user"
WHERE
pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY) = 'username'
AND pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY) = 'email'
AND pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY) = 'password';
通常我在 Python 和 SQLAlchemy 方面做得很好,但这确实超出了我的技能范围,而且我真的不知道如何修改这个函数,因为它影响了完全未知的功能我.
目前我使用文本功能,但这很难看
例如另一种从用户名(加密数据)获取 id 的方法
@classmethod
def get_id(cls, username):
query = """
SELECT private.user.id
FROM private."user"
WHERE pgp_sym_decrypt(private."user".username, :ENCRYPTION_KEY) = :username;
"""
q = db.session.query(cls).from_statement(text(query)).params(ENCRYPTION_KEY=ENCRYPTION_KEY,
username=username).first()
return q.id
非常感谢您的帮助!
最好的问候
通过添加解决:
class PGPString(TypeDecorator):
(...)
def pgp_sym_decrypt(self, col):
return func.pgp_sym_decrypt(col, self.passphrase, type_=String)
到 PGPString 并像这样使用:
class User(db.session):
(...)
@classmethod
def check_user_exist(cls, **kw):
exists = db.session.query(cls).with_entities(cls.username)\
.filter(func.pgp_sym_decrypt(User.username, ENCRYPTION_KEY, type_=String) == kw['username'])\
.filter(func.pgp_sym_decrypt(User.email, ENCRYPTION_KEY, type_=String) == kw['email'])\
.filter(func.pgp_sym_decrypt(User.password, ENCRYPTION_KEY, type_=String) == kw['password'])\
.first() is not None
return exist
感谢 sqlalchemy 团队:
https://github.com/sqlalchemy/sqlalchemy/discussions/7268
我对每个 table 使用 class 模型,对每个 sql 操作使用 SQLAlchemy 的方法。
我安装 pgp_sym_encrypt 和 pgp_sym_decrypt 扩展 PostgreSQL
CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA my_database;
在 SQLAlchemy 文档的帮助下,我使用我的 class 用户,我在 pgp_sym_encrypt 的帮助下加密数据,PostgreSQL 的 pgcrypto 包的功能(我使用的版本13.4)
为了安全,我对用户的数据进行了加密我想使用postgreSQL的原生pgp_sym_encrypt和pgp_sym_decrypt原生函数。在我使用纯文本查询通过 psycopg2 执行此操作之前。由于我使用 SQLAlchemy ORM,通过它 Flask-SQLALchemy 扩展。
我找到了解决方案: https://docs.sqlalchemy.org/en/14/core/custom_types.html#applying-sql-level-bind-result-processing
但其中 None 显示了在查询中使用它们的示例
实际上我的代码如下:
import uuid
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, String, DateTime, func, type_coerce, TypeDecorator
from sqlalchemy.dialects.postgresql import UUID, BYTEA
from instance.config import ENCRYPTION_KEY
db = SQLAlchemy()
class PGPString(TypeDecorator):
impl = BYTEA
cache_ok = True
def __init__(self, passphrase):
super(PGPString, self).__init__()
self.passphrase = passphrase
def bind_expression(self, bindvalue):
# convert the bind's type from PGPString to
# String, so that it's passed to psycopg2 as is without
# a dbapi.Binary wrapper
bindvalue = type_coerce(bindvalue, String)
return func.pgp_sym_encrypt(bindvalue, self.passphrase)
def column_expression(self, col):
return func.pgp_sym_decrypt(col, self.passphrase)
class User(db.Model):
__tablename__ = "user"
__table_args__ = {'schema': 'private'}
id = Column('id', UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
sexe = Column("sexe", String)
username = Column("username", PGPString(ENCRYPTION_KEY), unique=True)
firstname = Column("firstname", PGPString(ENCRYPTION_KEY), unique=True)
lastname = Column("lastname", PGPString(ENCRYPTION_KEY), unique=True)
email = Column("email", PGPString(ENCRYPTION_KEY), unique=True)
password = Column("password", PGPString(ENCRYPTION_KEY), unique=True)
registration_date = Column("registration_date", DateTime)
validation_date = Column("validation_date", DateTime)
role = Column("role", String)
@classmethod
def create_user(cls, **kw):
"""
Create an User
"""
obj = cls(**kw)
db.session.add(obj)
db.session.commit()
一切正常,当我使用我的 create_user 方法时,用户数据已加密。
但是如果在我的 class 中添加一个方法来检查用户是否存在,通过此方法检查用户名、电子邮件或其他数据:
class User(db.Model)
....
@classmethod
def check_user_exist(cls, **kwargs):
"""
Check if user exist
:param kwargs:
:return:
"""
exists = db.session.query(cls).with_entities(cls.username).filter_by(**kwargs).first() is not None
return exists
这将产生以下查询:
SELECT private."user".id AS private_user_id,
private."user".sexe AS private_user_sexe,
pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY) AS private_user_username,
pgp_sym_decrypt(private."user".firstname, ENCRYPTION_KEY) AS private_user_firstname,
pgp_sym_decrypt(private."user".lastname, ENCRYPTION_KEY) AS private_user_lastname,
pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY) AS private_user_email,
pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY) AS private_user_password,
private."user".registration_date AS private_user_registration_date,
private."user".validation_date AS private_user_validation_date,
private."user".role AS private_user_role
FROM private."user";
WHERE private."user".username = pgp_sym_encrypt(username, ENCRYPTION_KEY)
AND private."user".email = pgp_sym_encrypt(email, ENCRYPTION_KEY)
AND private."user".password = pgp_sym_encrypt(password, ENCRYPTION_KEY);
这个查询 return None
因为当我们将 pgp_sym_encrypt 与实际的 ENCRYPTION_KEY 一起使用时,数据将以新的方式加密。
https://www.postgresql.org/docs/13/pgcrypto.html#id-1.11.7.34.8 https://en.wikipedia.org/wiki/Pretty_Good_Privacy#/media/File:PGP_diagram.svg
那么如何修改 SQLAlchemy 文档函数 PGPString(TypeDecorator):
以使用 pgp_sym_decrypt 解密列而不是加密变量,如下面的查询:
SELECT private."user".id AS private_user_id,
private."user".sexe AS private_user_sexe,
pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY) AS private_user_username,
pgp_sym_decrypt(private."user".firstname, ENCRYPTION_KEY) AS private_user_firstname,
pgp_sym_decrypt(private."user".lastname, ENCRYPTION_KEY) AS private_user_lastname,
pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY) AS private_user_email,
pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY) AS private_user_password,
private."user".registration_date AS private_user_registration_date,
private."user".validation_date AS private_user_validation_date,
private."user".role AS private_user_role
FROM private."user"
WHERE
pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY) = 'username'
AND pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY) = 'email'
AND pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY) = 'password';
通常我在 Python 和 SQLAlchemy 方面做得很好,但这确实超出了我的技能范围,而且我真的不知道如何修改这个函数,因为它影响了完全未知的功能我.
目前我使用文本功能,但这很难看
例如另一种从用户名(加密数据)获取 id 的方法
@classmethod
def get_id(cls, username):
query = """
SELECT private.user.id
FROM private."user"
WHERE pgp_sym_decrypt(private."user".username, :ENCRYPTION_KEY) = :username;
"""
q = db.session.query(cls).from_statement(text(query)).params(ENCRYPTION_KEY=ENCRYPTION_KEY,
username=username).first()
return q.id
非常感谢您的帮助! 最好的问候
通过添加解决:
class PGPString(TypeDecorator):
(...)
def pgp_sym_decrypt(self, col):
return func.pgp_sym_decrypt(col, self.passphrase, type_=String)
到 PGPString 并像这样使用:
class User(db.session):
(...)
@classmethod
def check_user_exist(cls, **kw):
exists = db.session.query(cls).with_entities(cls.username)\
.filter(func.pgp_sym_decrypt(User.username, ENCRYPTION_KEY, type_=String) == kw['username'])\
.filter(func.pgp_sym_decrypt(User.email, ENCRYPTION_KEY, type_=String) == kw['email'])\
.filter(func.pgp_sym_decrypt(User.password, ENCRYPTION_KEY, type_=String) == kw['password'])\
.first() is not None
return exist
感谢 sqlalchemy 团队: https://github.com/sqlalchemy/sqlalchemy/discussions/7268