在 Flask App 中,Celery 应该在哪里实例化?

In a Flask App, where should Celery be instantiated?

我有一个 Flask 应用程序,这是一个非常基本的应用程序,带有 POST 处理程序和一些数据库插入。使用 Celery 将数据库插入设置为任务。如果我将我的 Celery 实例创建和任务定义放在 tasks.py 文件中,并从我的 main.py 文件(也有 Flask 应用程序创建)调用函数,我会得到一个 out of context errortasks.py 文件中的任务依次调用执行数据库插入的数据库 class。如何正确创建 Celery 实例并确保它具有 Flask 上下文?

结构大致如下:

  1. main.py = Flask 应用程序创建、路由处理和 tasks.delay 调用。
  2. tasks.py = Celery 实例创建和任务定义。
  3. DB = 插入。

我希望所有内容都在相同的上下文中工作。

Flask 文档 suggest subclassing Celery 的任务 class 并将任务执行包装在 Flask 应用上下文中。所以在 task.py 中,如果你的 Flask 应用程序实例被命名为 app 而你的 Celery 实例被命名为 celery,你将用新的 sub[=18] 替换 celery 的 Task 属性=]:

TaskBase = celery.Task
class ContextTask(TaskBase):
    abstract = True
    def __call__(self, *args, **kwargs):
        with app.app_context():
            return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask