Flask 中并发的 Apscheduler 运行 个作业
Apscheduler running jobs concurrently in Flask
在 Flask 中,我试图同时 运行 多个作业。但是我遇到了这个问题:
This typically means that you attempted to use functionality that needed
to interface with the current application object in some way. To solve
this, set up an application context with app.app_context(). See the
documentation for more information.
Job "chron (trigger: interval[0:00:05], next run at: 2020-05-19 16:03:46 IST)" raised an exception
Traceback (most recent call last):
File "C:\Users\mithi\AppData\Local\Programs\Python\Python37\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
retval = job.func(*job.args, **job.kwargs)
File "c:\Users\mithi\Desktop\appengineering\server.py", line 128, in chron
return jsonify({})
File "C:\Users\mithi\AppData\Local\Programs\Python\Python37\lib\site-packages\flask\json\__init__.py", line 358, in jsonify
if current_app.config["JSONIFY_PRETTYPRINT_REGULAR"] or current_app.debug:
File "C:\Users\mithi\AppData\Local\Programs\Python\Python37\lib\site-packages\werkzeug\local.py", line 348, in __getattr__
return getattr(self._get_current_object(), name)
File "C:\Users\mithi\AppData\Local\Programs\Python\Python37\lib\site-packages\werkzeug\local.py", line 307, in _get_current_object
return self.__local()
File "C:\Users\mithi\AppData\Local\Programs\Python\Python37\lib\site-packages\flask\globals.py", line 52, in _find_app
raise RuntimeError(_app_ctx_err_msg)
RuntimeError: Working outside of application context.
这是我的计划对象:
job_defaults = {
'coalesce': False,
'max_instances': 100
}
sched = BackgroundScheduler(daemon=True, job_defaults=job_defaults)
sched.start()
我通过定义以下函数来创建多个作业:
def etl():
# read app config
with open('appConfig.json', encoding="utf-8") as json_file:
configData = json.load(json_file)
for obj in configData:
sched.add_job(chron, 'interval', seconds=5, args=[obj], id=obj)
基本上,该函数是将作业添加为对函数 "chron" 的调用,但 args 和 id 不同,这会创建唯一的作业,运行每五秒间隔一次。我在 chron 函数中遇到 app_context 问题。我读过 app_context 但仍然无法理解这个概念。
您的 "chron" 函数正在调用 Flask current_app 和 jsonify() 但您似乎还没有推送 Flask 上下文因此出现此外部应用程序上下文运行时错误。
在调用 "contexted" 函数之前,您需要推送 Flask 应用上下文。
有两种方法,
app = Flask(__name__)
# do some stuff on your app if you need
app.app_context().push()
# do some other stuff
或在您的计时功能中,
with app.app_context():
# do something
You can refer to manually pushing a context for more details
在 Flask 中,我试图同时 运行 多个作业。但是我遇到了这个问题:
This typically means that you attempted to use functionality that needed
to interface with the current application object in some way. To solve
this, set up an application context with app.app_context(). See the
documentation for more information.
Job "chron (trigger: interval[0:00:05], next run at: 2020-05-19 16:03:46 IST)" raised an exception
Traceback (most recent call last):
File "C:\Users\mithi\AppData\Local\Programs\Python\Python37\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
retval = job.func(*job.args, **job.kwargs)
File "c:\Users\mithi\Desktop\appengineering\server.py", line 128, in chron
return jsonify({})
File "C:\Users\mithi\AppData\Local\Programs\Python\Python37\lib\site-packages\flask\json\__init__.py", line 358, in jsonify
if current_app.config["JSONIFY_PRETTYPRINT_REGULAR"] or current_app.debug:
File "C:\Users\mithi\AppData\Local\Programs\Python\Python37\lib\site-packages\werkzeug\local.py", line 348, in __getattr__
return getattr(self._get_current_object(), name)
File "C:\Users\mithi\AppData\Local\Programs\Python\Python37\lib\site-packages\werkzeug\local.py", line 307, in _get_current_object
return self.__local()
File "C:\Users\mithi\AppData\Local\Programs\Python\Python37\lib\site-packages\flask\globals.py", line 52, in _find_app
raise RuntimeError(_app_ctx_err_msg)
RuntimeError: Working outside of application context.
这是我的计划对象:
job_defaults = {
'coalesce': False,
'max_instances': 100
}
sched = BackgroundScheduler(daemon=True, job_defaults=job_defaults)
sched.start()
我通过定义以下函数来创建多个作业:
def etl():
# read app config
with open('appConfig.json', encoding="utf-8") as json_file:
configData = json.load(json_file)
for obj in configData:
sched.add_job(chron, 'interval', seconds=5, args=[obj], id=obj)
基本上,该函数是将作业添加为对函数 "chron" 的调用,但 args 和 id 不同,这会创建唯一的作业,运行每五秒间隔一次。我在 chron 函数中遇到 app_context 问题。我读过 app_context 但仍然无法理解这个概念。
您的 "chron" 函数正在调用 Flask current_app 和 jsonify() 但您似乎还没有推送 Flask 上下文因此出现此外部应用程序上下文运行时错误。
在调用 "contexted" 函数之前,您需要推送 Flask 应用上下文。
有两种方法,
app = Flask(__name__)
# do some stuff on your app if you need
app.app_context().push()
# do some other stuff
或在您的计时功能中,
with app.app_context():
# do something
You can refer to manually pushing a context for more details