运行 Flask 应用程序中的 APScheduler 作为后台函数而不干扰其他线程
Running APScheduler within an Flask App as background function without disturbing other threads
下面是我的 init.py 函数,我想在其中安排一个后台任务 运行 每个星期一 10:30 上午。
代码没有显示错误,但也不起作用。
from flask import Flask, render_template
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_login import LoginManager
from sqlalchemy import event
from sqlalchemy.engine import Engine
from sqlite3 import Connection as SQLite3Connection
from apscheduler.schedulers.background import BackgroundScheduler
from Scripts.File_merge import File_merge_auto
db = SQLAlchemy()
migrate = Migrate()
login_manager = LoginManager()
sched = BackgroundScheduler(daemon=True)
sched.add_job(File_merge_auto.sirmerge(),'cron',day_of_week='mon',hour=10,minute=30)
sched.start()
def create_app():
"""Application-factory pattern"""
# Initialize our core application
app = Flask(__name__)
if app.config['ENV'] == 'development':
app.config.from_object('config.DevelopmentConfig')
else:
app.config.from_object('config.ProductionConfig')
# Error Handling
@app.errorhandler(404)
def page_not_found(error):
return render_template('error_pages/404.html'), 404
@app.errorhandler(500)
def internal_error(error):
return render_template('error_pages/500.html'), 500
@app.errorhandler(400)
def bad_request(error):
return render_template('error_pages/400.html'), 400
@app.errorhandler(401)
def unauthorized(error):
return render_template('error_pages/401.html'), 401
@app.errorhandler(403)
def forbidden(error):
return render_template('error_pages/403.html'), 403
@app.errorhandler(405)
def method_not_allowed(error):
return render_template('error_pages/405.html'), 405
@app.errorhandler(503)
def service_unavailable(error):
return render_template('error_pages/503.html'), 503
with app.app_context():
# Import Routes
from .views.login import login_bp
from .views.home import home_bp
from .views.admin import admin_bp
from .views.flash_dashboard import flash_dashboard_bp
from .views.blank_dashboard1 import blank_dashboard1_bp
from .views.blank_dashboard2 import blank_dashboard2_bp
# Register Blueprints
app.register_blueprint(login_bp)
app.register_blueprint(home_bp)
app.register_blueprint(admin_bp)
app.register_blueprint(flash_dashboard_bp)
app.register_blueprint(blank_dashboard1_bp)
app.register_blueprint(blank_dashboard2_bp)
# Initialize extensions
db.init_app(app)
migrate.init_app(app, db)
login_manager.init_app(app)
return app
谁能告诉我哪里做错了?
我基本上需要 运行 一个后台任务来合并驱动器中的几个文件,然后我的其他应用程序在交互时根据需要使用这些文件。
您正在调用 File_merge_auto.sirmerge()
并将其 return 值传递给 sched.add_job()
。因此,您的代码等同于:
value = File_merge_auto.sirmerge()
sched.add_job(value,'cron',day_of_week='mon',hour=10,minute=30)
这可能不是您想要的,因此您需要将函数(而不是它的 return 值)传递给 add_job()
:
sched.add_job(File_merge_auto.sirmerge,'cron',day_of_week='mon',hour=10,minute=30)
如果您还有任何问题,请参阅文档的 troubleshooting section。
下面是我的 init.py 函数,我想在其中安排一个后台任务 运行 每个星期一 10:30 上午。
代码没有显示错误,但也不起作用。
from flask import Flask, render_template
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_login import LoginManager
from sqlalchemy import event
from sqlalchemy.engine import Engine
from sqlite3 import Connection as SQLite3Connection
from apscheduler.schedulers.background import BackgroundScheduler
from Scripts.File_merge import File_merge_auto
db = SQLAlchemy()
migrate = Migrate()
login_manager = LoginManager()
sched = BackgroundScheduler(daemon=True)
sched.add_job(File_merge_auto.sirmerge(),'cron',day_of_week='mon',hour=10,minute=30)
sched.start()
def create_app():
"""Application-factory pattern"""
# Initialize our core application
app = Flask(__name__)
if app.config['ENV'] == 'development':
app.config.from_object('config.DevelopmentConfig')
else:
app.config.from_object('config.ProductionConfig')
# Error Handling
@app.errorhandler(404)
def page_not_found(error):
return render_template('error_pages/404.html'), 404
@app.errorhandler(500)
def internal_error(error):
return render_template('error_pages/500.html'), 500
@app.errorhandler(400)
def bad_request(error):
return render_template('error_pages/400.html'), 400
@app.errorhandler(401)
def unauthorized(error):
return render_template('error_pages/401.html'), 401
@app.errorhandler(403)
def forbidden(error):
return render_template('error_pages/403.html'), 403
@app.errorhandler(405)
def method_not_allowed(error):
return render_template('error_pages/405.html'), 405
@app.errorhandler(503)
def service_unavailable(error):
return render_template('error_pages/503.html'), 503
with app.app_context():
# Import Routes
from .views.login import login_bp
from .views.home import home_bp
from .views.admin import admin_bp
from .views.flash_dashboard import flash_dashboard_bp
from .views.blank_dashboard1 import blank_dashboard1_bp
from .views.blank_dashboard2 import blank_dashboard2_bp
# Register Blueprints
app.register_blueprint(login_bp)
app.register_blueprint(home_bp)
app.register_blueprint(admin_bp)
app.register_blueprint(flash_dashboard_bp)
app.register_blueprint(blank_dashboard1_bp)
app.register_blueprint(blank_dashboard2_bp)
# Initialize extensions
db.init_app(app)
migrate.init_app(app, db)
login_manager.init_app(app)
return app
谁能告诉我哪里做错了?
我基本上需要 运行 一个后台任务来合并驱动器中的几个文件,然后我的其他应用程序在交互时根据需要使用这些文件。
您正在调用 File_merge_auto.sirmerge()
并将其 return 值传递给 sched.add_job()
。因此,您的代码等同于:
value = File_merge_auto.sirmerge()
sched.add_job(value,'cron',day_of_week='mon',hour=10,minute=30)
这可能不是您想要的,因此您需要将函数(而不是它的 return 值)传递给 add_job()
:
sched.add_job(File_merge_auto.sirmerge,'cron',day_of_week='mon',hour=10,minute=30)
如果您还有任何问题,请参阅文档的 troubleshooting section。