RQPython"Working outside of application context"
RQ Python "Working outside of application context"
我正在尝试使用 Redis 和 RQ 来设置发送电子邮件的任务,但是,"RQ Worker" 在使用向 q.enqueue 外部发送电子邮件的功能时返回运行时错误工作正常.
app/routes.py
routes = Blueprint("routes", __name__)
r = Redis()
q = Queue(connection=r)
def sendEmail_task(recipient, message):
msg = Message("Test Email", sender=("Me", "shawkyelshazly2@gmail.com"),
recipients=[recipient])
msg.body = message
msg.send(mail)
@routes.route("/send_email", methods=["POST", "GET"])
def send_mail():
if request.method == "POST":
recipient = request.form.get('email')
message = request.form.get('message')
job = q.enqueue(sendEmail_task, recipient, message)
return redirect(url_for("routes.email_sent"))
return render_template("send_email.html")
app/__init__.py
mail = Mail()
def create_app(config_class = Config):
app = Flask(__name__)
from app.routes import routes
app.register_blueprint(routes)
app.config.from_object(Config)
with app.app_context():
mail.init_app(app)
return app
run.py
Which is outside the app folder
from app import create_app
app = create_app()
if __name__ == '__main__':
app.run(debug=True)
您可能漏掉了 app.app_context().push()
。 Flask Mega 教程就像 this 一样,但我已经在任务中完成了。
我正在尝试使用 Redis 和 RQ 来设置发送电子邮件的任务,但是,"RQ Worker" 在使用向 q.enqueue 外部发送电子邮件的功能时返回运行时错误工作正常.
app/routes.py
routes = Blueprint("routes", __name__) r = Redis() q = Queue(connection=r) def sendEmail_task(recipient, message): msg = Message("Test Email", sender=("Me", "shawkyelshazly2@gmail.com"), recipients=[recipient]) msg.body = message msg.send(mail) @routes.route("/send_email", methods=["POST", "GET"]) def send_mail(): if request.method == "POST": recipient = request.form.get('email') message = request.form.get('message') job = q.enqueue(sendEmail_task, recipient, message) return redirect(url_for("routes.email_sent")) return render_template("send_email.html")
app/__init__.py
mail = Mail() def create_app(config_class = Config): app = Flask(__name__) from app.routes import routes app.register_blueprint(routes) app.config.from_object(Config) with app.app_context(): mail.init_app(app) return app
run.py
Which is outside the app folder
from app import create_app
app = create_app()
if __name__ == '__main__':
app.run(debug=True)
您可能漏掉了 app.app_context().push()
。 Flask Mega 教程就像 this 一样,但我已经在任务中完成了。