SQLAlchemy 查询 运行 两次,仅在执行时间长的单独线程上
SQLAlchemy queries running twice, only on a separate thread with long execution time
我的应用程序创建了一个 Flask 应用程序以及一个后台进程,它确实与我的 MySQL 数据库(通过 SQLAlchemy)一起工作:
from task_manager import TaskManager
# Session is a sessionmaker created earlier
task_manager = TaskManager(timedelta(seconds = 1), timedelta(seconds = 1), Session)
threading.Thread(target=task_manager.scheduler_loop).start()
app.run(debug=True, host='0.0.0.0', port=5000)
只要此进程找到可用任务(这是在 scheduler_loop 中 运行 在单独的线程中),它就会做一些工作:
with db_session(self.Session) as session:
task = session.query(Task).filter(or_(Task.date == None, Task.date <= datetime.now())).order_by(Task.priority).first()
if task is not None:
if task.type == "create_paper_task":
self.create_paper(session, task.paper_title)
elif task.type == "update_citations_task":
self.update_citations(session, task.paper)
session.delete(task)
...
def create_paper(self, session, paper_title):
...
# For the purposes of testing, I replaced a long API call with this sleep.
time.sleep(3)
paper = Paper(paper_title, year)
paper.citations.append(Citation(citations, datetime.now()))
session.add(paper)
如果我尝试使用此代码,SQLAlchemy 查询是 运行 两次。创建了两个 Paper 对象,我得到了这个错误(大概是任务被删除了两次):
/app/task_manager.py:17: SAWarning: DELETE statement on table 'create_paper_task' expected to delete 1 row(s); 0 were matched. Please set confirm_deleted_rows=False within the mapper configuration to prevent this warning.
实际代码本身不是 运行ning 两次,而且绝对没有多个调度程序线程 运行ning:我已经使用 print 语句对此进行了测试。
现在,最奇怪的是这个问题只发生在
- 执行过程中等待了很长时间。 time.sleep去掉就没有问题,
- Flask 应用 运行ning 并且调度程序循环 运行ning 在单独的线程中。如果 Flask 应用程序不是 运行ning,或者 scheduler_loop 在主线程中是 运行ning(所以显然 Flask 应用程序不是 运行ning),那么没问题。
此外,在我对此进行测试时根本没有使用 Flask 应用程序,所以这不是问题所在。
当你设置debug=True
时,Flask的app.run
函数会运行你的初始化代码两次。这是 Flask 可以检测代码更改并根据需要动态重启的方式的一部分。不利的一面是,这会导致您的线程 运行 两次,这反过来会在读取和执行您的任务时产生竞争条件,这确实只会在任务花费足够长的时间让第二个线程开始工作时才会出现。
有关正在发生的事情的更多详细信息,请参阅此 question/answer:Why does running the Flask dev server run itself twice?
为避免这种情况,您可以添加代码来避免第二次执行,但这有一个限制,即修改代码的 auto-reloading 功能将不再起作用。一般来说,最好使用 Celery 之类的东西来处理任务执行,而不是构建自己的解决方案。但是,如链接答案中所述,您可以使用
from werkzeug.serving import is_running_from_reloader
if is_running_from_reloader():
from task_manager import TaskManager
task_manager = TaskManager(timedelta(seconds = 1), timedelta(seconds = 1), Session)
threading.Thread(target=task_manager.scheduler_loop).start()
除非您处于第二个(重新加载的)进程中,否则不会创建您的线程。 请注意,如果您删除 debug=True
.
,这将完全阻止您的线程执行
我的应用程序创建了一个 Flask 应用程序以及一个后台进程,它确实与我的 MySQL 数据库(通过 SQLAlchemy)一起工作:
from task_manager import TaskManager
# Session is a sessionmaker created earlier
task_manager = TaskManager(timedelta(seconds = 1), timedelta(seconds = 1), Session)
threading.Thread(target=task_manager.scheduler_loop).start()
app.run(debug=True, host='0.0.0.0', port=5000)
只要此进程找到可用任务(这是在 scheduler_loop 中 运行 在单独的线程中),它就会做一些工作:
with db_session(self.Session) as session:
task = session.query(Task).filter(or_(Task.date == None, Task.date <= datetime.now())).order_by(Task.priority).first()
if task is not None:
if task.type == "create_paper_task":
self.create_paper(session, task.paper_title)
elif task.type == "update_citations_task":
self.update_citations(session, task.paper)
session.delete(task)
...
def create_paper(self, session, paper_title):
...
# For the purposes of testing, I replaced a long API call with this sleep.
time.sleep(3)
paper = Paper(paper_title, year)
paper.citations.append(Citation(citations, datetime.now()))
session.add(paper)
如果我尝试使用此代码,SQLAlchemy 查询是 运行 两次。创建了两个 Paper 对象,我得到了这个错误(大概是任务被删除了两次):
/app/task_manager.py:17: SAWarning: DELETE statement on table 'create_paper_task' expected to delete 1 row(s); 0 were matched. Please set confirm_deleted_rows=False within the mapper configuration to prevent this warning.
实际代码本身不是 运行ning 两次,而且绝对没有多个调度程序线程 运行ning:我已经使用 print 语句对此进行了测试。
现在,最奇怪的是这个问题只发生在
- 执行过程中等待了很长时间。 time.sleep去掉就没有问题,
- Flask 应用 运行ning 并且调度程序循环 运行ning 在单独的线程中。如果 Flask 应用程序不是 运行ning,或者 scheduler_loop 在主线程中是 运行ning(所以显然 Flask 应用程序不是 运行ning),那么没问题。
此外,在我对此进行测试时根本没有使用 Flask 应用程序,所以这不是问题所在。
当你设置debug=True
时,Flask的app.run
函数会运行你的初始化代码两次。这是 Flask 可以检测代码更改并根据需要动态重启的方式的一部分。不利的一面是,这会导致您的线程 运行 两次,这反过来会在读取和执行您的任务时产生竞争条件,这确实只会在任务花费足够长的时间让第二个线程开始工作时才会出现。
有关正在发生的事情的更多详细信息,请参阅此 question/answer:Why does running the Flask dev server run itself twice?
为避免这种情况,您可以添加代码来避免第二次执行,但这有一个限制,即修改代码的 auto-reloading 功能将不再起作用。一般来说,最好使用 Celery 之类的东西来处理任务执行,而不是构建自己的解决方案。但是,如链接答案中所述,您可以使用
from werkzeug.serving import is_running_from_reloader
if is_running_from_reloader():
from task_manager import TaskManager
task_manager = TaskManager(timedelta(seconds = 1), timedelta(seconds = 1), Session)
threading.Thread(target=task_manager.scheduler_loop).start()
除非您处于第二个(重新加载的)进程中,否则不会创建您的线程。 请注意,如果您删除 debug=True
.