来自线程 Flask 的安全数据库查询 (peewee)

Safe database query (peewee) from thread Flask

我有几个简单的任务最多可能需要 20 秒才能完成,所以我决定使用单独的线程来完成它们。我希望线程完成工作并使用结果更新数据库。

虽然它有效(还没有例外)我缺乏对 Flask 内部结构的理解以及它如何与 WSGI 服务器一起工作。我不太确定在一定数量的并行请求上它不会以某些数据库访问错误结束。

简化代码:

from time import time, sleep
from threading import Thread
from peewee import *
from playhouse.shortcuts import model_to_dict
from flask import Flask, abort, jsonify

db = SqliteDatabase("test.db")
Flask(__name__)

class Task(Model):
    status = IntegerField(default=0)
    result = TextField(null=True)

    class Meta:
        database = db

def do_smth(task_id):
    start = time()
    sleep(10)
    # DATABASE UPDATE HERE
    Task.update({Task.status: 1, Task.result: f"{start} - {time()}"})\
        .where(Task.id == task_id).execute()

@app.route("/new")
def new_task():
    try:
        task = Task.create()
    except IntegrityError:
        abort(500)
    else:
        Thread(target=do_smth, args=(task.id,)).start()

        return jsonify(model_to_dict(task))

@app.route("/get/<int:task_id>")
def get_task(task_id):
    try:
        task = Task.get(Task.id == task_id)
    except Task.DoesNotExist:
        abort(404)
    else:
        return jsonify(model_to_dict(task))

@app.before_request
def before_request():
    db.connect()

@app.after_request
def after_request(response):
    db.close()
    return response

if __name__ == "__main__":
    with db:
        db.create_tables([Task])

    app.run(host="127.0.0.1", port=5000)

peewee tutorial I added custom Flask.before_request and Flask.after_request 中建议的那样打开和关闭数据库连接。

所以问题是如何从单独的线程安全地更新数据库?我有一个想法添加路由,它将更新数据库并从线程发送请求,但我发现它有点愚蠢。

P.S. 我已经尽力做到准确,但如果有什么不清楚的地方,我会尽力澄清,只是评论区提问

这是个好问题:

how to update database from separate thread safely?

对于 Sqlite,您必须记住,它一次只允许一位作者。因此,您必须仔细管理您的连接,以确保您只在必须时才执行写入 txn,并确保在完成后立即提交。

由于您在请求的生命周期内打开和关闭数据库,并且 运行 您的数据库操作在单独的线程中进行,因此您应该可以进行少量操作(100? ).我认为我要注意的主要事情是,在您的任务主体期间,请确保您仅在尽可能短的时间内保持该写入 txn 处于打开状态:

def do_smth(task_id):
    # open a database connection. it will be read-only for now.
    with db.connection_context():
        start = time()
        sleep(10)
        with db.atomic() as txn:  # here is write tx, keep this brief!
            Task.update({Task.status: 1, Task.result: f"{start} - {time()}"})\
                .where(Task.id == task_id).execute()

请参阅交易的第一部分:https://charlesleifer.com/blog/going-fast-with-sqlite-and-python/

要更激进的方法,您可以试试这个:https://charlesleifer.com/blog/multi-threaded-sqlite-without-the-operationalerrors/