你能 运行 并行查询多个 Flask SQLAlchemy 吗?

Can you run multiple Flask SQLAlchemy queries in parallel?

在我当前的设置中,如果我执行五个 100 毫秒的查询,它们总共需要 500 毫秒。有什么办法可以让它们并行 运行 这样只需要 100 毫秒吗?

我 运行ning Flask 落后于 nginx/uwsgi,但可以更改其中任何一项。

具体来说,我希望能够从中转换代码:

result_1 = db.session.query(...).all()
result_2 = db.session.query(...).all()
result_3 = db.session.query(...).all()

像这样:

result_1, result_2, result_3 = run_in_parallel([
  db.session.query(...).all(),
  db.session.query(...).all(),
  db.session.query(...).all(),
])

有没有办法用 Flask 和 SQLAlchemy 做到这一点?

一般的并行性

一般来说,如果您想 运行 并行执行任务,您可以使用线程或进程。在 python 中,线程非常适合 I/O 绑定的任务(这意味着它们花费的时间用于等待其他资源 - 等待您的数据库、磁盘或远程网络服务器),和流程非常适合 CPU 绑定的任务(数学和其他计算密集型任务)。

concurrent.futures

在你的情况下,线程是理想的。 Python 有一个 threading module that you can look into, but there's a fair bit to unpack: safely using threads usually means limiting the number of threads that can be run by using a pool of threads and a queue for tasks. For that reason I much prefer concurrent.futures 库,它提供围绕 threading 的包装器,为您提供易于使用的界面并为您处理很多复杂性。

当使用 concurrent.futures 时,您创建一个执行器,然后将任务连同参数列表一起提交给它。而不是像这样调用函数:

# get 4 to the power of 5
result = pow(4, 5)
print(result)

您提交函数及其参数:

您通常会像这样使用 concurrent.futures:

from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor()
future = executor.submit(pow, 4, 5)
print(future.result())

注意我们如何不使用 pow() 调用函数,我们提交函数对象 pow 执行器将在线程内调用。

为了更方便地在 Flask 中使用 concurrent.futures 库,您可以使用 flask-executor,它的工作方式与任何其他 Flask 扩展一样。它还处理后台任务需要访问后台任务中的 Flask 上下文局部变量(如 appsessiongrequest 对象)的边缘情况。完全披露:我编写并维护了这个库。

(有趣的事实:concurrent.futures 包装线程和多处理,使用相同的 API - 所以如果你发现自己在未来需要为 CPU 绑定任务进行多处理,你可以使用以相同的方式使用相同的库来实现您的目标)

综合起来

以下是使用 flask-executor 到 运行 并行 SQLAlchemy 任务的情况:

from flask_executor import Executor
# ... define your `app` and `db` objects

executor = Executor(app)   

# run the same query three times in parallel and collect all the results
futures = []
for i in range(3):
    # note the lack of () after ".all", as we're passing the function object, not calling it ourselves
    future = executor.submit(db.session.query(MyModel).all) 
    futures.append(future)

for future in futures:
    print(future.result())

繁荣,你现在有 运行 个并行的 Flask SQLAlchemy 查询。