Flask,蓝图使用芹菜任务并获得循环导入
Flask, blueprints uses celery task and got cycle import
我有一个带有蓝图和 Celery 的应用程序
代码在这里:
config.py
import os
from celery.schedules import crontab
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or ''
SQLALCHEMY_COMMIT_ON_TEARDOWN = True
RECORDS_PER_PAGE = 40
SQLALCHEMY_DATABASE_URI = ''
CELERY_BROKER_URL = ''
CELERY_RESULT_BACKEND = ''
CELERY_RESULT_DBURI = ''
CELERY_TIMEZONE = 'Europe/Kiev'
CELERY_ENABLE_UTC = False
CELERYBEAT_SCHEDULE = {}
@staticmethod
def init_app(app):
pass
class DevelopmentConfig(Config):
DEBUG = True
WTF_CSRF_ENABLED = True
APP_HOME = ''
SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
CELERY_BROKER_URL = 'sqla+mysql://...'
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = 'mysql://...'
CELERY_TIMEZONE = 'Europe/Kiev'
CELERY_ENABLE_UTC = False
CELERYBEAT_SCHEDULE = {
'send-email-every-morning': {
'task': 'app.workers.tasks.send_email_task',
'schedule': crontab(hour=6, minute=15),
},
}
class TestConfig(Config):
DEBUG = True
WTF_CSRF_ENABLED = False
TESTING = True
SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
class ProdConfig(Config):
DEBUG = False
WTF_CSRF_ENABLED = True
SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
CELERY_BROKER_URL = 'sqla+mysql://...celery'
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = 'mysql://.../celery'
CELERY_TIMEZONE = 'Europe/Kiev'
CELERY_ENABLE_UTC = False
CELERYBEAT_SCHEDULE = {
'send-email-every-morning': {
'task': 'app.workers.tasks.send_email_task',
'schedule': crontab(hour=6, minute=15),
},
}
config = {
'development': DevelopmentConfig,
'default': ProdConfig,
'production': ProdConfig,
'testing': TestConfig,
}
class AppConf:
"""
Class to store current config even out of context
"""
def __init__(self):
self.app = None
self.config = {}
def init_app(self, app):
if hasattr(app, 'config'):
self.app = app
self.config = app.config.copy()
else:
raise TypeError
初始化.py:
导入 os
from flask import Flask
from celery import Celery
from config import config, AppConf
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
app_conf.init_app(app)
# Connect to Staging view
from staging.views import staging as staging_blueprint
app.register_blueprint(staging_blueprint)
return app
def make_celery(app=None):
app = app or create_app(os.getenv('FLASK_CONFIG') or 'default')
celery = Celery(__name__, broker=app.config.CELERY_BROKER_URL)
celery.conf.update(app.conf)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
tasks.py:
从应用程序导入 make_celery、app_conf
cel = make_celery(app_conf.app)
@cel.task
def send_realm_to_fabricdb(realm, form):
some actions...
问题是:
蓝图 "staging" 使用任务 send_realm_to_fabricdb,因此它使得:from tasks import send_realm_to_fabricdb
比,当我只是 运行 应用程序时,一切正常
但是,当我尝试 运行 celery: celery -A app.tasks worker -l info --beat
时,它转到 tasks.py 中的 cel = make_celery(app_conf.app)
,得到 app=None 并再次尝试创建应用程序: 注册蓝图...所以我在这里有循环导入。
你能告诉我如何打破这个循环吗?
提前致谢。
我没有代码来尝试这个,但我认为如果你将 Celery 实例的创建从 tasks.py
移到 create_app
函数中,事情会更好,以便它在创建 app
实例的同时发生。
你在 -A
选项中给 Celery worker 的参数不需要有任务,Celery 只需要 celery 对象,例如,你可以创建一个单独的启动脚本,说 celery_worker.py
调用 create_app
创建 app
和 cel
然后将其作为 -A celery_worker.cel
交给工人,根本不涉及蓝图。
希望对您有所帮助。
我解决这个错误的方法是创建两个 Flask 实例,一个用于 Web 应用程序,另一个用于初始 Celery 实例。
正如@Miguel 所说,我有
- celery_app.py 用于芹菜实例
- manager.py 对于 Flask 实例
在这两个文件中,每个模块都有自己的 Flask 实例。
所以我可以在视图中使用 celery.task。我可以单独开始celery worker
。
感谢Bob Jordan
,您可以从、
中找到答案
要点:
1. make_celery
同时做两件事:创建celery app和运行 celery with flask content,所以你可以创建两个函数来做make_celery
job
2. celery 应用程序必须在蓝图注册前初始化
有同样的问题,我最终使用 shared_task
(docs) 很容易地解决了它,保留一个 app.py 文件而不必多次实例化烧瓶应用程序.
导致循环导入的原情况:
from src.app import celery # src.app is ALSO importing the blueprints which are importing this file which causes the circular import.
@celery.task(bind=True)
def celery_test(self):
sleep(5)
logger.info("Task processed by Celery.")
当前运行良好并避免循环导入的代码:
# from src.app import celery <- not needed anymore!
@shared_task(bind=True)
def celery_test(self):
sleep(5)
logger.info("Task processed by Celery.")
请注意,我是 Celery 的新手,所以我可能正在监督重要的事情,如果有经验的人可以发表意见,那就太好了。
我有一个带有蓝图和 Celery 的应用程序 代码在这里:
config.py
import os
from celery.schedules import crontab
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or ''
SQLALCHEMY_COMMIT_ON_TEARDOWN = True
RECORDS_PER_PAGE = 40
SQLALCHEMY_DATABASE_URI = ''
CELERY_BROKER_URL = ''
CELERY_RESULT_BACKEND = ''
CELERY_RESULT_DBURI = ''
CELERY_TIMEZONE = 'Europe/Kiev'
CELERY_ENABLE_UTC = False
CELERYBEAT_SCHEDULE = {}
@staticmethod
def init_app(app):
pass
class DevelopmentConfig(Config):
DEBUG = True
WTF_CSRF_ENABLED = True
APP_HOME = ''
SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
CELERY_BROKER_URL = 'sqla+mysql://...'
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = 'mysql://...'
CELERY_TIMEZONE = 'Europe/Kiev'
CELERY_ENABLE_UTC = False
CELERYBEAT_SCHEDULE = {
'send-email-every-morning': {
'task': 'app.workers.tasks.send_email_task',
'schedule': crontab(hour=6, minute=15),
},
}
class TestConfig(Config):
DEBUG = True
WTF_CSRF_ENABLED = False
TESTING = True
SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
class ProdConfig(Config):
DEBUG = False
WTF_CSRF_ENABLED = True
SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
CELERY_BROKER_URL = 'sqla+mysql://...celery'
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = 'mysql://.../celery'
CELERY_TIMEZONE = 'Europe/Kiev'
CELERY_ENABLE_UTC = False
CELERYBEAT_SCHEDULE = {
'send-email-every-morning': {
'task': 'app.workers.tasks.send_email_task',
'schedule': crontab(hour=6, minute=15),
},
}
config = {
'development': DevelopmentConfig,
'default': ProdConfig,
'production': ProdConfig,
'testing': TestConfig,
}
class AppConf:
"""
Class to store current config even out of context
"""
def __init__(self):
self.app = None
self.config = {}
def init_app(self, app):
if hasattr(app, 'config'):
self.app = app
self.config = app.config.copy()
else:
raise TypeError
初始化.py: 导入 os
from flask import Flask
from celery import Celery
from config import config, AppConf
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
app_conf.init_app(app)
# Connect to Staging view
from staging.views import staging as staging_blueprint
app.register_blueprint(staging_blueprint)
return app
def make_celery(app=None):
app = app or create_app(os.getenv('FLASK_CONFIG') or 'default')
celery = Celery(__name__, broker=app.config.CELERY_BROKER_URL)
celery.conf.update(app.conf)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
tasks.py: 从应用程序导入 make_celery、app_conf
cel = make_celery(app_conf.app)
@cel.task
def send_realm_to_fabricdb(realm, form):
some actions...
问题是:
蓝图 "staging" 使用任务 send_realm_to_fabricdb,因此它使得:from tasks import send_realm_to_fabricdb
比,当我只是 运行 应用程序时,一切正常
但是,当我尝试 运行 celery: celery -A app.tasks worker -l info --beat
时,它转到 tasks.py 中的 cel = make_celery(app_conf.app)
,得到 app=None 并再次尝试创建应用程序: 注册蓝图...所以我在这里有循环导入。
你能告诉我如何打破这个循环吗?
提前致谢。
我没有代码来尝试这个,但我认为如果你将 Celery 实例的创建从 tasks.py
移到 create_app
函数中,事情会更好,以便它在创建 app
实例的同时发生。
你在 -A
选项中给 Celery worker 的参数不需要有任务,Celery 只需要 celery 对象,例如,你可以创建一个单独的启动脚本,说 celery_worker.py
调用 create_app
创建 app
和 cel
然后将其作为 -A celery_worker.cel
交给工人,根本不涉及蓝图。
希望对您有所帮助。
我解决这个错误的方法是创建两个 Flask 实例,一个用于 Web 应用程序,另一个用于初始 Celery 实例。
正如@Miguel 所说,我有
- celery_app.py 用于芹菜实例
- manager.py 对于 Flask 实例
在这两个文件中,每个模块都有自己的 Flask 实例。
所以我可以在视图中使用 celery.task。我可以单独开始celery worker
。
感谢Bob Jordan
,您可以从、
要点:
1. make_celery
同时做两件事:创建celery app和运行 celery with flask content,所以你可以创建两个函数来做make_celery
job
2. celery 应用程序必须在蓝图注册前初始化
有同样的问题,我最终使用 shared_task
(docs) 很容易地解决了它,保留一个 app.py 文件而不必多次实例化烧瓶应用程序.
导致循环导入的原情况:
from src.app import celery # src.app is ALSO importing the blueprints which are importing this file which causes the circular import.
@celery.task(bind=True)
def celery_test(self):
sleep(5)
logger.info("Task processed by Celery.")
当前运行良好并避免循环导入的代码:
# from src.app import celery <- not needed anymore!
@shared_task(bind=True)
def celery_test(self):
sleep(5)
logger.info("Task processed by Celery.")
请注意,我是 Celery 的新手,所以我可能正在监督重要的事情,如果有经验的人可以发表意见,那就太好了。