Python:没有外部库的 Flask 简单任务队列无法正常工作
Python: Flask simple task queue without external libraries not working
我想用 Flask 做一个没有任何数据库的简单任务队列。
在最简单的版本中,我有两个端点。提交作业并检查状态。
提交作业会将请求添加到队列中,并检查状态 return 作业 ID 的状态(排队、运行ning、失败、完成)。
工作流程如下:
- 用户提交了一份工作
- 作业已添加到队列
- 用户将每 5 秒检查一次作业状态
- 每次状态检查都会触发一个函数,检查 运行ning 作业数是否小于最大作业数(来自配置)。如果数字较小,它将跨越另一个线程,作业位于队列顶部。
这是简化的代码:
app = Flask(__name__)
def finish_job(job_id):
finished.append(job_id)
last = running.pop(job_id)
last.close()
def remove_finished():
for j in list(running.keys()):
if not running[j].is_alive():
finish_job(j)
def start_jobs():
while len(running) < config.threads and len(queue_list) > 0:
print('running now', len(running))
next_job = queue.pop()
queue_list.remove(next_job[0])
start_job(*next_job)
@app.route("/Simulation", methods=['POST'])
@authenticate
def submit_job():
# create id
job_id = str(uuid.uuid4())
job_data = request.data.decode('utf-8')
queue.append((job_id, job_data))
queue_list.add(job_id)
return 'QUEUED', 200
@app.route("/Simulation/<uuid:job_id>", methods=['GET'])
@authenticate
def check_status(job_id: uuid):
job_id = str(job_id)
remove_finished()
start_jobs()
if job_id in running:
r = 'RUNNING'
elif job_id in queue_list:
r = 'QUEUED'
elif job_id in finished:
r = 'COMPLETED'
else:
r = 'FAILED'
return status_response(r), 200
running = {}
finished = []
queue = []
queue_list = set()
app.run()
现在的问题是,如果多个用户同时提交检查状态请求,并且只有一个空槽可用于 运行 执行一项任务,则这两个请求都会产生该作业。
有什么方法可以强制 Flask 一次只 运行 一个函数实例吗?
谢谢
经过多次搜索,我终于找到了答案。
As of Flask 1.0, the builtin WSGI server runs threaded by default.
所以,我只需要添加参数来停止线程
app.run(threaded=False)
我想用 Flask 做一个没有任何数据库的简单任务队列。 在最简单的版本中,我有两个端点。提交作业并检查状态。 提交作业会将请求添加到队列中,并检查状态 return 作业 ID 的状态(排队、运行ning、失败、完成)。 工作流程如下:
- 用户提交了一份工作
- 作业已添加到队列
- 用户将每 5 秒检查一次作业状态
- 每次状态检查都会触发一个函数,检查 运行ning 作业数是否小于最大作业数(来自配置)。如果数字较小,它将跨越另一个线程,作业位于队列顶部。
这是简化的代码:
app = Flask(__name__)
def finish_job(job_id):
finished.append(job_id)
last = running.pop(job_id)
last.close()
def remove_finished():
for j in list(running.keys()):
if not running[j].is_alive():
finish_job(j)
def start_jobs():
while len(running) < config.threads and len(queue_list) > 0:
print('running now', len(running))
next_job = queue.pop()
queue_list.remove(next_job[0])
start_job(*next_job)
@app.route("/Simulation", methods=['POST'])
@authenticate
def submit_job():
# create id
job_id = str(uuid.uuid4())
job_data = request.data.decode('utf-8')
queue.append((job_id, job_data))
queue_list.add(job_id)
return 'QUEUED', 200
@app.route("/Simulation/<uuid:job_id>", methods=['GET'])
@authenticate
def check_status(job_id: uuid):
job_id = str(job_id)
remove_finished()
start_jobs()
if job_id in running:
r = 'RUNNING'
elif job_id in queue_list:
r = 'QUEUED'
elif job_id in finished:
r = 'COMPLETED'
else:
r = 'FAILED'
return status_response(r), 200
running = {}
finished = []
queue = []
queue_list = set()
app.run()
现在的问题是,如果多个用户同时提交检查状态请求,并且只有一个空槽可用于 运行 执行一项任务,则这两个请求都会产生该作业。 有什么方法可以强制 Flask 一次只 运行 一个函数实例吗? 谢谢
经过多次搜索,我终于找到了答案。
As of Flask 1.0, the builtin WSGI server runs threaded by default.
所以,我只需要添加参数来停止线程
app.run(threaded=False)