从 Flask 请求中读取全局 collections.deque 安全吗?
Is reading a global collections.deque from within a Flask request safe?
我有一个 Flask 应用程序,它应该向指定路径上的用户显示长 运行 函数的结果。结果大约每小时更改一次。为了避免用户不得不等待结果,我想将它缓存在应用程序的某个地方并在后台以特定时间间隔(例如每小时)重新计算它,这样用户请求就不必等待long 运行 计算函数。
我想出解决这个问题的想法如下,但是,我不完全确定这是否真的 "safe" 在具有多线程甚至多进程的生产环境中执行网络服务器,例如 waitress
、eventlet
、gunicorn
或其他。
为了在后台重新计算结果,我使用了 APScheduler library 中的 BackgroundScheduler
。
然后将结果左附加到一个 collections.deque 对象中,该对象注册为模块范围的变量(因为据我所知,在 Flask 应用程序中保存应用程序范围的全局变量没有更好的可能性?!)。由于 deque 的最大大小设置为 2,旧结果将在新结果进入时从 deque 的右侧弹出。
A Flask 视图现在 returns deque[0]
给请求者,它应该总是最新的结果。我决定 deque
而不是 Queue
,因为后者没有内置的可能性来读取第一项而不删除它。
因此,可以保证没有用户需要等待结果,因为旧结果只会在新结果出现的那一刻从 "cache" 中消失。
请参阅下面的最小示例。当 运行 脚本并点击 http://localhost:5000
时,可以看到正在运行的缓存 - "Job finished at" 不应迟于 10 秒加上一些非常短的时间用于在 [=42] 之后重新计算它=],仍然应该永远不必等待 time.sleep(5)
秒从工作功能到请求 returns。
对于给定的要求,这是否是一个有效的实现,它也可以在生产就绪的 WSGI 服务器设置中工作,还是应该以不同的方式完成?
from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
import time
import datetime
from collections import deque
# a global deque that is filled by APScheduler and read by a Flask view
deque = deque(maxlen=2)
# a function filling the deque that is executed in regular intervals by APScheduler
def some_long_running_job():
print('complicated long running job started...')
time.sleep(5)
job_finished_at = datetime.datetime.now()
deque.appendleft(job_finished_at)
# a function setting up the scheduler
def start_scheduler():
scheduler = BackgroundScheduler()
scheduler.add_job(some_long_running_job,
trigger='interval',
seconds=10,
next_run_time=datetime.datetime.utcnow(),
id='1',
name='Some Job name'
)
scheduler.start()
# a flask application
app = Flask(__name__)
# a flask route returning an item from the global deque
@app.route('/')
def display_job_result():
current_time = datetime.datetime.now()
job_finished_at = deque[0]
return '''
Current time is: {0} <br>
Job finished at: {1}
'''.format(current_time, job_finished_at)
# start the scheduler and flask server
if __name__ == '__main__':
start_scheduler()
app.run()
如果 运行 多个进程,线程安全是不够的:
尽管 collections.deque
是线程安全的:
Deques support thread-safe, memory efficient appends and pops from either side of the deque with approximately the same O(1) performance in either direction.
来源:https://docs.python.org/3/library/collections.html#collections.deque
根据您的配置,您的网络服务器可能 运行 在多个进程中有多个工作人员,因此每个进程都有自己的对象实例。
即使只有一个 worker,线程安全也可能不够:
您可能选择了异步工作器类型。异步 worker 不知道什么时候可以安全地让步,并且必须保护您的代码免受以下情况的影响:
- 请求 1 的 Worker 读取值
a
并产生
- 请求 2 的 Worker 也读取值
a
,写入 a + 1
并产生
- 请求 1 的工作程序写入值
a + 1
,即使它应该是 a + 1 + 1
可能的解决方案:
使用 Flask 应用程序之外的东西来存储数据。这可以是数据库,在这种情况下最好是像 Redis 这样的内存数据库。或者,如果您的 worker 类型与 multiprocessing
模块兼容,您可以尝试使用 multiprocessing.managers.BaseManager
将您的 Python 对象提供给所有 worker 进程。
- How can I provide shared state to my Flask app with multiple workers without depending on additional software?
我有一个 Flask 应用程序,它应该向指定路径上的用户显示长 运行 函数的结果。结果大约每小时更改一次。为了避免用户不得不等待结果,我想将它缓存在应用程序的某个地方并在后台以特定时间间隔(例如每小时)重新计算它,这样用户请求就不必等待long 运行 计算函数。
我想出解决这个问题的想法如下,但是,我不完全确定这是否真的 "safe" 在具有多线程甚至多进程的生产环境中执行网络服务器,例如 waitress
、eventlet
、gunicorn
或其他。
为了在后台重新计算结果,我使用了 APScheduler library 中的 BackgroundScheduler
。
然后将结果左附加到一个 collections.deque 对象中,该对象注册为模块范围的变量(因为据我所知,在 Flask 应用程序中保存应用程序范围的全局变量没有更好的可能性?!)。由于 deque 的最大大小设置为 2,旧结果将在新结果进入时从 deque 的右侧弹出。
A Flask 视图现在 returns deque[0]
给请求者,它应该总是最新的结果。我决定 deque
而不是 Queue
,因为后者没有内置的可能性来读取第一项而不删除它。
因此,可以保证没有用户需要等待结果,因为旧结果只会在新结果出现的那一刻从 "cache" 中消失。
请参阅下面的最小示例。当 运行 脚本并点击 http://localhost:5000
时,可以看到正在运行的缓存 - "Job finished at" 不应迟于 10 秒加上一些非常短的时间用于在 [=42] 之后重新计算它=],仍然应该永远不必等待 time.sleep(5)
秒从工作功能到请求 returns。
对于给定的要求,这是否是一个有效的实现,它也可以在生产就绪的 WSGI 服务器设置中工作,还是应该以不同的方式完成?
from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
import time
import datetime
from collections import deque
# a global deque that is filled by APScheduler and read by a Flask view
deque = deque(maxlen=2)
# a function filling the deque that is executed in regular intervals by APScheduler
def some_long_running_job():
print('complicated long running job started...')
time.sleep(5)
job_finished_at = datetime.datetime.now()
deque.appendleft(job_finished_at)
# a function setting up the scheduler
def start_scheduler():
scheduler = BackgroundScheduler()
scheduler.add_job(some_long_running_job,
trigger='interval',
seconds=10,
next_run_time=datetime.datetime.utcnow(),
id='1',
name='Some Job name'
)
scheduler.start()
# a flask application
app = Flask(__name__)
# a flask route returning an item from the global deque
@app.route('/')
def display_job_result():
current_time = datetime.datetime.now()
job_finished_at = deque[0]
return '''
Current time is: {0} <br>
Job finished at: {1}
'''.format(current_time, job_finished_at)
# start the scheduler and flask server
if __name__ == '__main__':
start_scheduler()
app.run()
如果 运行 多个进程,线程安全是不够的:
尽管 collections.deque
是线程安全的:
Deques support thread-safe, memory efficient appends and pops from either side of the deque with approximately the same O(1) performance in either direction.
来源:https://docs.python.org/3/library/collections.html#collections.deque
根据您的配置,您的网络服务器可能 运行 在多个进程中有多个工作人员,因此每个进程都有自己的对象实例。
即使只有一个 worker,线程安全也可能不够:
您可能选择了异步工作器类型。异步 worker 不知道什么时候可以安全地让步,并且必须保护您的代码免受以下情况的影响:
- 请求 1 的 Worker 读取值
a
并产生 - 请求 2 的 Worker 也读取值
a
,写入a + 1
并产生 - 请求 1 的工作程序写入值
a + 1
,即使它应该是a + 1 + 1
可能的解决方案:
使用 Flask 应用程序之外的东西来存储数据。这可以是数据库,在这种情况下最好是像 Redis 这样的内存数据库。或者,如果您的 worker 类型与 multiprocessing
模块兼容,您可以尝试使用 multiprocessing.managers.BaseManager
将您的 Python 对象提供给所有 worker 进程。
- How can I provide shared state to my Flask app with multiple workers without depending on additional software?