Flask,蓝图使用芹菜任务并获得循环导入

Flask, blueprints uses celery task and got cycle import

我有一个带有蓝图和 Celery 的应用程序 代码在这里:

config.py

import os
from celery.schedules import crontab
basedir = os.path.abspath(os.path.dirname(__file__))

class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or ''
    SQLALCHEMY_COMMIT_ON_TEARDOWN = True
    RECORDS_PER_PAGE = 40
    SQLALCHEMY_DATABASE_URI = ''
    CELERY_BROKER_URL = ''
    CELERY_RESULT_BACKEND = ''
    CELERY_RESULT_DBURI = ''
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {}

    @staticmethod
    def init_app(app):
        pass


class DevelopmentConfig(Config):
    DEBUG = True
    WTF_CSRF_ENABLED = True
    APP_HOME = ''
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
    CELERY_BROKER_URL = 'sqla+mysql://...'
    CELERY_RESULT_BACKEND = "database"
    CELERY_RESULT_DBURI = 'mysql://...'
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {
        'send-email-every-morning': {
            'task': 'app.workers.tasks.send_email_task',
            'schedule': crontab(hour=6, minute=15),
        },
    }


class TestConfig(Config):
    DEBUG = True
    WTF_CSRF_ENABLED = False
    TESTING = True
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'


class ProdConfig(Config):
    DEBUG = False
    WTF_CSRF_ENABLED = True
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
    CELERY_BROKER_URL = 'sqla+mysql://...celery'
    CELERY_RESULT_BACKEND = "database"
    CELERY_RESULT_DBURI = 'mysql://.../celery'
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {
        'send-email-every-morning': {
            'task': 'app.workers.tasks.send_email_task',
            'schedule': crontab(hour=6, minute=15),
        },
    }

config = {
    'development': DevelopmentConfig,
    'default': ProdConfig,
    'production': ProdConfig,
    'testing': TestConfig,
}


class AppConf:
    """
    Class to store current config even out of context
    """
    def __init__(self):
        self.app = None
        self.config = {}

    def init_app(self, app):
        if hasattr(app, 'config'):
            self.app = app
            self.config = app.config.copy()
        else:
            raise TypeError

初始化.py: 导入 os

from flask import Flask
from celery import Celery
from config import config, AppConf

def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])
    config[config_name].init_app(app)
    app_conf.init_app(app)

    # Connect to Staging view
    from staging.views import staging as staging_blueprint
    app.register_blueprint(staging_blueprint)

    return app


def make_celery(app=None):
    app = app or create_app(os.getenv('FLASK_CONFIG') or 'default')
    celery = Celery(__name__, broker=app.config.CELERY_BROKER_URL)
    celery.conf.update(app.conf)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

tasks.py: 从应用程序导入 make_celery、app_conf

cel = make_celery(app_conf.app)

@cel.task
def send_realm_to_fabricdb(realm, form):
    some actions...

问题是: 蓝图 "staging" 使用任务 send_realm_to_fabricdb,因此它使得:from tasks import send_realm_to_fabricdb 比,当我只是 运行 应用程序时,一切正常 但是,当我尝试 运行 celery: celery -A app.tasks worker -l info --beat 时,它转到 tasks.py 中的 cel = make_celery(app_conf.app),得到 app=None 并再次尝试创建应用程序: 注册蓝图...所以我在这里有循环导入。 你能告诉我如何打破这个循环吗? 提前致谢。

我没有代码来尝试这个,但我认为如果你将 Celery 实例的创建从 tasks.py 移到 create_app 函数中,事情会更好,以便它在创建 app 实例的同时发生。

你在 -A 选项中给 Celery worker 的参数不需要有任务,Celery 只需要 celery 对象,例如,你可以创建一个单独的启动脚本,说 celery_worker.py 调用 create_app 创建 appcel 然后将其作为 -A celery_worker.cel 交给工人,根本不涉及蓝图。

希望对您有所帮助。

我解决这个错误的方法是创建两个 Flask 实例,一个用于 Web 应用程序,另一个用于初始 Celery 实例。

正如@Miguel 所说,我有

  • celery_app.py 用于芹菜实例
  • manager.py 对于 Flask 实例

在这两个文件中,每个模块都有自己的 Flask 实例。

所以我可以在视图中使用 celery.task。我可以单独开始celery worker

感谢Bob Jordan,您可以从、

中找到答案

要点:
1. make_celery同时做两件事:创建celery app和运行 celery with flask content,所以你可以创建两个函数来做make_celery job
2. celery 应用程序必须在蓝图注册前初始化

有同样的问题,我最终使用 shared_task (docs) 很容易地解决了它,保留一个 app.py 文件而不必多次实例化烧瓶应用程序.

导致循环导入的原情况:

from src.app import celery  # src.app is ALSO importing the blueprints which are importing this file which causes the circular import.


@celery.task(bind=True)
def celery_test(self):
    sleep(5)
    logger.info("Task processed by Celery.")

当前运行良好并避免循环导入的代码:

# from src.app import celery <- not needed anymore!


@shared_task(bind=True)
def celery_test(self):
    sleep(5)
    logger.info("Task processed by Celery.")

请注意,我是 Celery 的新手,所以我可能正在监督重要的事情,如果有经验的人可以发表意见,那就太好了。