来自 Flask 的 RQ 任务不断添加到队列中

RQ task from Flask keeps adding to queue

我的设置是:

目标是通过点击 Flask 应用启动 RQ 任务。

Redis 似乎 运行 不错,rq worker(由“rq worker”启动)也是如此

简单的任务会排队并完成。 简单的子处理“ls”task2 运行良好。

但是我的子处理 python 没有 !

烧瓶python代码(ex1.py):

from flask import Flask, request
import redis
from rq import Queue

import time 

app = Flask(__name__)

r = redis.Redis() #can use password
q = Queue(connection=r)

def task2():
    import subprocess
    result = subprocess.run(['ls', '-l'], stdout=subprocess.PIPE)
    output = result.stdout.decode('utf-8')
    print(output)
    return output

def pyv():
    import subprocess
    result = subprocess.run(['python', '--version'], stdout=subprocess.PIPE)
    output = result.stdout.decode('utf-8')
    print(output)
    return output


@app.route('/t2')
def t2():
    job = q.enqueue(task2)
    q_len = len(q)
    return f'Task {job.id} added to queue at {job.enqueued_at}. {q_len} tasks in the queue'

@app.route('/pyv')
def pyv():
    job = q.enqueue(pyv)
    q_len = len(q)
    return f'Task {job.id} added to queue at {job.enqueued_at}. {q_len} tasks in the queue'


@app.route('/get_tasks')
def get_tasks():
    q_len = len(q)
    return f'{q_len} tasks in the queue'

if __name__ == '__main__':
    app.run()

当我点击端点“/pyv”时,我的“rq worker”window 的输出变得异常,似乎正在添加项目(???)...这是一个示例.. .

17:57:33 default: ex1.pyv() (5378aa3c-9c01-4b42-ad84-f22021de1983)
17:57:34 default: Job OK (5378aa3c-9c01-4b42-ad84-f22021de1983)
17:57:34 Result is kept for 500 seconds
17:57:34 default: ex1.pyv() (0d1edd70-9916-4ffc-bf45-692c60bbc327)
17:57:34 default: Job OK (0d1edd70-9916-4ffc-bf45-692c60bbc327)
17:57:34 Result is kept for 500 seconds
17:57:34 default: ex1.pyv() (99402ee1-909e-474b-b410-be0dfff3f9ea)
17:57:34 default: Job OK (99402ee1-909e-474b-b410-be0dfff3f9ea)
17:57:34 Result is kept for 500 seconds
17:57:34 default: ex1.pyv() (d049328d-e5d9-4767-b3f0-3356aff3d59b)
17:57:34 default: Job OK (d049328d-e5d9-4767-b3f0-3356aff3d59b)
17:57:34 Result is kept for 500 seconds
17:57:34 default: ex1.pyv() (d2cb2ab8-86fe-43e3-8bb9-dc3576672d60)
17:57:34 default: Job OK (d2cb2ab8-86fe-43e3-8bb9-dc3576672d60)
17:57:34 Result is kept for 500 seconds
17:57:34 default: ex1.pyv() (00e8ccbf-2a39-48e6-bd1c-27e6ca982cdc)
17:57:34 default: Job OK (00e8ccbf-2a39-48e6-bd1c-27e6ca982cdc)

...等等...它不会停止,直到我 Cntl+c 并重新启动 docker Redis。

有什么想法吗? 我猜想使用这个框架启动这样的命令行应用程序应该没问题吧?这是我第一次尝试 RQ+Flask。我的最终目标是完成一个使用 Tensorflow 的繁重的 python 命令行应用程序。我会将我的工作人员限制为 2 个,并且我已确保该应用程序仅使用我 GPU 内存的一半......所有这些都按原样正常工作。

但首先,我需要解决这个问题! --谢谢。

“答案”是我两次使用相同的函数名称 (pyv)。

def pyv():
    import subprocess
    result = subprocess.run(['python', '--version'], stdout=subprocess.PIPE)
    output = result.stdout.decode('utf-8')
    print(output)
    return output

@app.route('/pyv')
def pyv():
    job = q.enqueue(pyv)
    q_len = len(q)
    return f'Task {job.id} added to queue at {job.enqueued_at}. {q_len} tasks in the queue'

所以要点是不要将任务函数命名为与烧瓶函数相同的名称!