Redis后台作业完成后如何return flask render_template?

How do I return flask render_template after the Redis background job is done?

我将此 Web 应用程序放在烧瓶中,我想在提交表单后执行一些 ML 和 AI 算法。在 Redis 和 rq 的帮助下,我是 运行 后台工作中的 ML 和 AI 算法(因为我的应用程序由 Heroku 托管,他们有这个超时问题,你必须 return 30秒)。工作完成后,我想获取算法制作的图像(一些图表)并将它们输出到网页中,但我不知道如何在工作函数中呈现模板,并从烧瓶中导入应用程序执行此操作的应用程序似乎不起作用。您对如何解决这个问题有什么想法吗?

我的代码片段来自 Flask 应用程序,它使作业排队:

def upload():
    from mlsalespred import run_model
    file = request.files['file']
    dffile = pd.read_csv(file)
    job = q.enqueue(run_model, dffile)
    return render_template("waiting.html")

我的作业函数代码片段:

def run_model(dataFrame):
    - - - - - - - - - - -
    - - some ml stuff - -
    - - - - - - - - - - -
    return render_template("uploaded.html", sales_fig = sales_fig.decode('utf8'), diff_fig = diff_fig.decode('utf8'), pred_fig = pred_fig.decode('utf8') )

提前致谢

一个基本但可行的解决方案(gist):

您可以通过从使作业排队的路由重定向,然后让元标记定期刷新该页面来做到这一点。首先导入需要的库:

from flask import Flask, redirect, url_for, render_template_string
app = Flask(__name__)

from time import sleep

from rq import Queue
from rq.job import Job
from redis import Redis

设置rq相关连接,定义函数为运行:

r = Redis(host='redisserver')
q = Queue(connection=r)

def slow_func(data):
    sleep(5)
    return 'Processed %s' % (data,)

然后定义一个5秒刷新一次的模板:

template_str='''<html>
    <head>
      {% if refresh %}
        <meta http-equiv="refresh" content="5">
      {% endif %}
    </head>
    <body>{{result}}</body>
    </html>'''

我们还将使用 flask render_template_string 为 return 插入变量的模板创建一个辅助函数。请注意,刷新默认为 False,如果未提供:

def get_template(data, refresh=False):
    return render_template_string(template_str, result=data, refresh=refresh)

现在创建一条路由,它将使我们的函数入队,获取其 rq 作业 ID,然后 return 使用 id 重定向到 result 视图。这只是在 URL 字符串中输入,但可以从任何地方获取:

@app.route('/process/<string:data>')
def process(data):
    job = q.enqueue(slow_func, data)
    return redirect(url_for('result', id=job.id))

现在让我们在 rq.Job 对象的帮助下处理实际结果。可以调整此处的逻辑,因为这将导致页面刷新除 "finished":

之外的所有值
@app.route('/result/<string:id>')
def result(id):
    job = Job.fetch(id, connection=r)
    status = job.get_status()
    if status in ['queued', 'started', 'deferred', 'failed']:
        return get_template(status, refresh=True)
    elif status == 'finished':
        result = job.result 
        # If this is a string, we can simply return it:
        return get_template(result)

如果状态为 "finished",则 job.result 将包含 slow_func 的 return 值,因此我们将其呈现在页面上。

此方法的缺点是在等待作业完成时会向服务器发出多个请求。元刷新标签可能有点不合常规。如果您从 Javascript 发送更新请求,那么 solutions 可以间隔发送 AJAX 请求,尽管这会遇到相同的多请求问题。

替代方法是使用 websockets 或 SSE 将已完成作业的结果在完成后立即流式传输到前端。

更新:2021 年 2 月 27 日

我决定试一试用作业状态更新前端的 SSE 方法。我了解到 rq 通过在作业中导入 rq.get_current_job 来本地支持更新作业中的 meta 属性,然后可以在作业刷新后从外部访问它。

查看演示代码:

带进度条的基本示例(gist):