使用 Flask-SQLAlchemy 和 Flask-Marshmallow 聚合连接查询
Aggreate join query using Flask-SQLAlchemy and Flask-Marshmallow
我正在使用 Flask-SQLAlchemy 和 Flask-Marshmallow 针对 SQL 服务器实例提取数据。我尝试复制的查询如下所示
SELECT
o.[ProductId],
[AccountName] = a.[Name],
[AccountDescription] = a.[Description],
[TotalSize] = SUM(d.[Size])
FROM
[OrderDetail] d
JOIN
[Order] o
ON
d.[OrderId] = o.[Id]
JOIN
[Account] a
ON
d.[AccountId] = a.[Id]
WHERE
o.[Timestamp] <= @asOfDate
GROUP BY
o.[ProductId],
a.[Name],
a.[Description]
我很难将其翻译成 SQLAlchemy,特别是 Flask-SQLAlchemy,它的处理方式似乎略有不同。
这是我的尝试,位于我的路由函数中。此查询执行,但 SQL 即 运行 针对服务器只是三个 table 的连接——它不进行聚合或分组依据。我认为它还会选择每一列,而不仅仅是我在 with_entities
.
中指定的那些
"""views.py"""
@app.route('/product_sizes')
def get_product_sizes() -> Response:
as_of_date_str = request.args.get('asOfDate')
query = OrderDetail.query
.with_entities(Order.product_id.label('product_id'),
Account.name.label('account_name'),
Account.description.label('account_description'),
func.sum(OrderDetail.size).label('total_size')) \
.group_by('product_id', 'account_name', 'account_description')
if as_of_date_str:
as_of_date = datetime.strptime(as_of_date_str, '%Y-%m-%d')
query = query.join(OrderDetail.order).filter(Order.timestamp <= as_of_date)
result = schemas.product_size_schema.dump(query).data
json_string = json.dumps(result, cls=DefaultJsonEncoder)
return Response(response=json_string, status=200, mimetype="application/json")
我的问题是
- 如何修复查询,使其表现得像上面的 SQL 查询?
- 这大概就是 "correct" 做事的方式吗?我是 SQLAlchemy 和 Flask 的新手,所以我不确定正常的处理方式。例如,为每一列添加标签,然后按这些字符串进行分组似乎有点不对劲——我本来希望能够按我的模型 类 上的列属性进行分组。在尝试过滤加入的 table 时也调用
join
似乎是多余的——那个加入不是已经在 db.relationship
属性 中编码了吗?
我的模型和架构如下。
"""models.py"""
class Account(db.Model):
_id = db.Column('Id', db.Integer, primary_key=True)
name = db.Column('Name', db.Unicode(250))
description = db.Column('Description', db.Unicode)
class Order(db.Model):
_id = db.Column('Id', db.Integer, primary_key=True)
product_id = db.Column('ProductId', db.Integer)
timestamp = db.Column('TradeTimestamp', db.DateTime)
class OrderDetail(db.Model):
_id = db.Column('Id', db.Integer, primary_key=True)
order_id = db.Column('OrderId', db.ForeignKey('Order.Id'), index=True)
account_id = db.Column('AccountId', db.ForeignKey('Account.Id'), index=True)
size = db.Column('Size', db.Numeric(19, 6), nullable=False)
account = db.relationship('Account', primaryjoin='OrderDetail.account_id == Account._id', backref='order_details', lazy='joined')
order = db.relationship('Trade', primaryjoin='OrderDetail.order_id == Order._id', backref='order_details', lazy='joined')
"""schemas.py"""
class ProductSizeSchema(ma.ModelSchema):
"""Schema for serializing product size objects"""
product_id = fields.Int()
account_name = fields.Str()
account_description = fields.Str()
total_size = fields.Decimal(19, 6)
product_size_schema = ProductSizeSchema()
product_sizes_schema = ProductSizeSchema(many=True)
看了一圈,发现如果不方便,不使用Flask-SQLAlchemy查询方法(例如MyModel.query.<something>
)也没什么大不了的。所以我只是以正常的 SQLAlchemy 方式使用 db.session.query()
。
query = db.session \
.query(Order.instrument_id,
Account.name.label("account_name"),
Account.description.label("account_description"),
func.sum(OrderDetail.size).label("total_size")) \
.join(OrderDetail.order) \
.join(OrderDetail.account) \
.group_by(Order.instrument_id, Account.name, Account.description)
if as_of_date_str:
as_of_date = datetime.strptime(as_of_date_str, '%Y-%m-%d')
query = query.filter(Order.timestamp <= as_of_date)
我正在使用 Flask-SQLAlchemy 和 Flask-Marshmallow 针对 SQL 服务器实例提取数据。我尝试复制的查询如下所示
SELECT
o.[ProductId],
[AccountName] = a.[Name],
[AccountDescription] = a.[Description],
[TotalSize] = SUM(d.[Size])
FROM
[OrderDetail] d
JOIN
[Order] o
ON
d.[OrderId] = o.[Id]
JOIN
[Account] a
ON
d.[AccountId] = a.[Id]
WHERE
o.[Timestamp] <= @asOfDate
GROUP BY
o.[ProductId],
a.[Name],
a.[Description]
我很难将其翻译成 SQLAlchemy,特别是 Flask-SQLAlchemy,它的处理方式似乎略有不同。
这是我的尝试,位于我的路由函数中。此查询执行,但 SQL 即 运行 针对服务器只是三个 table 的连接——它不进行聚合或分组依据。我认为它还会选择每一列,而不仅仅是我在 with_entities
.
"""views.py"""
@app.route('/product_sizes')
def get_product_sizes() -> Response:
as_of_date_str = request.args.get('asOfDate')
query = OrderDetail.query
.with_entities(Order.product_id.label('product_id'),
Account.name.label('account_name'),
Account.description.label('account_description'),
func.sum(OrderDetail.size).label('total_size')) \
.group_by('product_id', 'account_name', 'account_description')
if as_of_date_str:
as_of_date = datetime.strptime(as_of_date_str, '%Y-%m-%d')
query = query.join(OrderDetail.order).filter(Order.timestamp <= as_of_date)
result = schemas.product_size_schema.dump(query).data
json_string = json.dumps(result, cls=DefaultJsonEncoder)
return Response(response=json_string, status=200, mimetype="application/json")
我的问题是
- 如何修复查询,使其表现得像上面的 SQL 查询?
- 这大概就是 "correct" 做事的方式吗?我是 SQLAlchemy 和 Flask 的新手,所以我不确定正常的处理方式。例如,为每一列添加标签,然后按这些字符串进行分组似乎有点不对劲——我本来希望能够按我的模型 类 上的列属性进行分组。在尝试过滤加入的 table 时也调用
join
似乎是多余的——那个加入不是已经在db.relationship
属性 中编码了吗?
我的模型和架构如下。
"""models.py"""
class Account(db.Model):
_id = db.Column('Id', db.Integer, primary_key=True)
name = db.Column('Name', db.Unicode(250))
description = db.Column('Description', db.Unicode)
class Order(db.Model):
_id = db.Column('Id', db.Integer, primary_key=True)
product_id = db.Column('ProductId', db.Integer)
timestamp = db.Column('TradeTimestamp', db.DateTime)
class OrderDetail(db.Model):
_id = db.Column('Id', db.Integer, primary_key=True)
order_id = db.Column('OrderId', db.ForeignKey('Order.Id'), index=True)
account_id = db.Column('AccountId', db.ForeignKey('Account.Id'), index=True)
size = db.Column('Size', db.Numeric(19, 6), nullable=False)
account = db.relationship('Account', primaryjoin='OrderDetail.account_id == Account._id', backref='order_details', lazy='joined')
order = db.relationship('Trade', primaryjoin='OrderDetail.order_id == Order._id', backref='order_details', lazy='joined')
"""schemas.py"""
class ProductSizeSchema(ma.ModelSchema):
"""Schema for serializing product size objects"""
product_id = fields.Int()
account_name = fields.Str()
account_description = fields.Str()
total_size = fields.Decimal(19, 6)
product_size_schema = ProductSizeSchema()
product_sizes_schema = ProductSizeSchema(many=True)
看了一圈,发现如果不方便,不使用Flask-SQLAlchemy查询方法(例如MyModel.query.<something>
)也没什么大不了的。所以我只是以正常的 SQLAlchemy 方式使用 db.session.query()
。
query = db.session \
.query(Order.instrument_id,
Account.name.label("account_name"),
Account.description.label("account_description"),
func.sum(OrderDetail.size).label("total_size")) \
.join(OrderDetail.order) \
.join(OrderDetail.account) \
.group_by(Order.instrument_id, Account.name, Account.description)
if as_of_date_str:
as_of_date = datetime.strptime(as_of_date_str, '%Y-%m-%d')
query = query.filter(Order.timestamp <= as_of_date)