从 Flask 请求中读取全局 collections.deque 安全吗?

Is reading a global collections.deque from within a Flask request safe?

我有一个 Flask 应用程序,它应该向指定路径上的用户显示长 运行 函数的结果。结果大约每小时更改一次。为了避免用户不得不等待结果,我想将它缓存在应用程序的某个地方并在后台以特定时间间隔(例如每小时)重新计算它,这样用户请求就不必等待long 运行 计算函数。

我想出解决这个问题的想法如下,但是,我不完全确定这是否真的 "safe" 在具有多线程甚至多进程的生产环境中执行网络服务器,例如 waitresseventletgunicorn 或其他。

为了在后台重新计算结果,我使用了 APScheduler library 中的 BackgroundScheduler

然后将结果左附加到一个 collections.deque 对象中,该对象注册为模块范围的变量(因为据我所知,在 Flask 应用程序中保存应用程序范围的全局变量没有更好的可能性?!)。由于 deque 的最大大小设置为 2,旧结果将在新结果进入时从 deque 的右侧弹出。

A Flask 视图现在 returns deque[0] 给请求者,它应该总是最新的结果。我决定 deque 而不是 Queue,因为后者没有内置的可能性来读取第一项而不删除它。

因此,可以保证没有用户需要等待结果,因为旧结果只会在新结果出现的那一刻从 "cache" 中消失。

请参阅下面的最小示例。当 运行 脚本并点击 http://localhost:5000 时,可以看到正在运行的缓存 - "Job finished at" 不应迟于 10 秒加上一些非常短的时间用于在 [=42] 之后重新计算它=],仍然应该永远不必等待 time.sleep(5) 秒从工作功能到请求 returns。

对于给定的要求,这是否是一个有效的实现,它也可以在生产就绪的 WSGI 服务器设置中工作,还是应该以不同的方式完成?

from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
import time
import datetime
from collections import deque

# a global deque that is filled by APScheduler and read by a Flask view
deque = deque(maxlen=2)

# a function filling the deque that is executed in regular intervals by APScheduler
def some_long_running_job():
    print('complicated long running job started...')
    time.sleep(5)
    job_finished_at = datetime.datetime.now()
    deque.appendleft(job_finished_at)

# a function setting up the scheduler
def start_scheduler():
    scheduler = BackgroundScheduler()
    scheduler.add_job(some_long_running_job,
                      trigger='interval',
                      seconds=10,
                      next_run_time=datetime.datetime.utcnow(),
                      id='1',
                      name='Some Job name'
                      )
    scheduler.start()

# a flask application
app = Flask(__name__)

# a flask route returning an item from the global deque
@app.route('/')
def display_job_result():
    current_time = datetime.datetime.now()
    job_finished_at = deque[0]

    return '''
        Current time is: {0} <br>
        Job finished at: {1}
        '''.format(current_time, job_finished_at)

# start the scheduler and flask server
if __name__ == '__main__':
    start_scheduler()
    app.run()

如果 运行 多个进程,线程安全是不够的:

尽管 collections.deque 是线程安全的:

Deques support thread-safe, memory efficient appends and pops from either side of the deque with approximately the same O(1) performance in either direction.

来源:https://docs.python.org/3/library/collections.html#collections.deque

根据您的配置,您的网络服务器可能 运行 在多个进程中有多个工作人员,因此每个进程都有自己的对象实例。


即使只有一个 worker,线程安全也可能不够:

您可能选择了异步工作器类型。异步 worker 不知道什么时候可以安全地让步,并且必须保护您的代码免受以下情况的影响:

  1. 请求 1 的 Worker 读取值 a 并产生
  2. 请求 2 的 Worker 也读取值 a,写入 a + 1 并产生
  3. 请求 1 的工作程序写入值 a + 1,即使它应该是 a + 1 + 1

可能的解决方案:

使用 Flask 应用程序之外的东西来存储数据。这可以是数据库,在这种情况下最好是像 Redis 这样的内存数据库。或者,如果您的 worker 类型与 multiprocessing 模块兼容,您可以尝试使用 multiprocessing.managers.BaseManager 将您的 Python 对象提供给所有 worker 进程。

  • How can I provide shared state to my Flask app with multiple workers without depending on additional software?