使用 python 的多重处理在 celery worker 和 Flask 之间共享列表
Shared list among celery workers and Flask using python's multiprocessing
我正在构建一个 Flask 应用程序,它依赖于 Celery 来处理一些较长的 运行 任务。一旦完成处理,每个任务基本上都会将一个字典附加到一个共享列表中——这个列表由 celery worker 和 Flask 应用程序的路由共享。 Flask 组件本质上由一组路由组成,用于检索共享列表的内容并修改元素的顺序。
我想我已经使用 Python 的多处理模块中的管理器在 Celery 工作人员之间成功共享了列表。但是,Flask 应用程序看不到对此列表所做的更改。这是一个说明问题的最小应用程序:
import os
import json
from flask import Flask
from multiprocessing import Manager
from celery import Celery
application = Flask(__name__)
redis_url = os.environ.get('REDIS_URL')
if redis_url is None:
redis_url = 'redis://localhost:6379/0'
# Set the secret key to enable cookies
application.secret_key = 'some secret key'
application.config['SESSION_TYPE'] = 'filesystem'
# Redis and Celery configuration
application.config['BROKER_URL'] = redis_url
application.config['CELERY_RESULT_BACKEND'] = redis_url
celery = Celery(application.name, broker=redis_url)
celery.conf.update(BROKER_URL=redis_url,
CELERY_RESULT_BACKEND=redis_url)
manager = Manager()
shared_queue = manager.list() # THIS IS THE SHARED LIST
@application.route("/submit", methods=['GET'])
def submit_song():
add_song_to_queue.delay()
return 'Added a song to the queue'
@application.route("/playlist", methods=['GET', 'POST'])
def get_playlist():
playlist = []
i = 0
queue_size = len(shared_queue)
while i < queue_size:
print(shared_queue[i])
playlist.append(shared_queue[i])
return json.dumps(playlist)
@celery.task
def add_song_to_queue():
shared_queue.append({'some':'data!'})
print(len(shared_queue))
if __name__ == "__main__":
application.run(host='0.0.0.0', debug=True)
在 celery 日志中我可以清楚地看到字典被附加到列表中,并且列表的大小增加了。但是,当我在浏览器上访问 /playlist 路由时,我总是得到一个空列表。
有谁知道如何让所有工作人员和 Flask 应用程序共享该列表?
我找到了一个解决方案,离开了 Celery,而是使用 multiprocessing.Pool 作为任务队列,并通过管理器共享内存,如问题中的示例代码所示。这个 link 有一个很好的例子说明如何将这个解决方案与 Flask 集成:http://gouthamanbalaraman.com/blog/python-multiprocessing-as-a-task-queue.html
from multiprocessing import Pool
from flask import Flask
app = Flask(__name__)
_pool = None
def expensive_function(x):
# import packages that is used in this function
# do your expensive time consuming process
return x*x
@app.route('/expensive_calc/<int:x>')
def route_expcalc(x):
f = _pool.apply_async(expensive_function,[x])
r = f.get(timeout=2)
return 'Result is %d'%r
if __name__=='__main__':
_pool = Pool(processes=4)
try:
# insert production server deployment code
app.run()
except KeyboardInterrupt:
_pool.close()
_pool.join()
我正在构建一个 Flask 应用程序,它依赖于 Celery 来处理一些较长的 运行 任务。一旦完成处理,每个任务基本上都会将一个字典附加到一个共享列表中——这个列表由 celery worker 和 Flask 应用程序的路由共享。 Flask 组件本质上由一组路由组成,用于检索共享列表的内容并修改元素的顺序。
我想我已经使用 Python 的多处理模块中的管理器在 Celery 工作人员之间成功共享了列表。但是,Flask 应用程序看不到对此列表所做的更改。这是一个说明问题的最小应用程序:
import os
import json
from flask import Flask
from multiprocessing import Manager
from celery import Celery
application = Flask(__name__)
redis_url = os.environ.get('REDIS_URL')
if redis_url is None:
redis_url = 'redis://localhost:6379/0'
# Set the secret key to enable cookies
application.secret_key = 'some secret key'
application.config['SESSION_TYPE'] = 'filesystem'
# Redis and Celery configuration
application.config['BROKER_URL'] = redis_url
application.config['CELERY_RESULT_BACKEND'] = redis_url
celery = Celery(application.name, broker=redis_url)
celery.conf.update(BROKER_URL=redis_url,
CELERY_RESULT_BACKEND=redis_url)
manager = Manager()
shared_queue = manager.list() # THIS IS THE SHARED LIST
@application.route("/submit", methods=['GET'])
def submit_song():
add_song_to_queue.delay()
return 'Added a song to the queue'
@application.route("/playlist", methods=['GET', 'POST'])
def get_playlist():
playlist = []
i = 0
queue_size = len(shared_queue)
while i < queue_size:
print(shared_queue[i])
playlist.append(shared_queue[i])
return json.dumps(playlist)
@celery.task
def add_song_to_queue():
shared_queue.append({'some':'data!'})
print(len(shared_queue))
if __name__ == "__main__":
application.run(host='0.0.0.0', debug=True)
在 celery 日志中我可以清楚地看到字典被附加到列表中,并且列表的大小增加了。但是,当我在浏览器上访问 /playlist 路由时,我总是得到一个空列表。
有谁知道如何让所有工作人员和 Flask 应用程序共享该列表?
我找到了一个解决方案,离开了 Celery,而是使用 multiprocessing.Pool 作为任务队列,并通过管理器共享内存,如问题中的示例代码所示。这个 link 有一个很好的例子说明如何将这个解决方案与 Flask 集成:http://gouthamanbalaraman.com/blog/python-multiprocessing-as-a-task-queue.html
from multiprocessing import Pool
from flask import Flask
app = Flask(__name__)
_pool = None
def expensive_function(x):
# import packages that is used in this function
# do your expensive time consuming process
return x*x
@app.route('/expensive_calc/<int:x>')
def route_expcalc(x):
f = _pool.apply_async(expensive_function,[x])
r = f.get(timeout=2)
return 'Result is %d'%r
if __name__=='__main__':
_pool = Pool(processes=4)
try:
# insert production server deployment code
app.run()
except KeyboardInterrupt:
_pool.close()
_pool.join()