在 Flask App 中,Celery 应该在哪里实例化?
In a Flask App, where should Celery be instantiated?
我有一个 Flask 应用程序,这是一个非常基本的应用程序,带有 POST 处理程序和一些数据库插入。使用 Celery 将数据库插入设置为任务。如果我将我的 Celery 实例创建和任务定义放在 tasks.py
文件中,并从我的 main.py
文件(也有 Flask 应用程序创建)调用函数,我会得到一个 out of context error
。 tasks.py
文件中的任务依次调用执行数据库插入的数据库 class。如何正确创建 Celery 实例并确保它具有 Flask 上下文?
结构大致如下:
- main.py = Flask 应用程序创建、路由处理和 tasks.delay 调用。
- tasks.py = Celery 实例创建和任务定义。
- DB = 插入。
我希望所有内容都在相同的上下文中工作。
Flask 文档 suggest subclassing Celery 的任务 class 并将任务执行包装在 Flask 应用上下文中。所以在 task.py
中,如果你的 Flask 应用程序实例被命名为 app
而你的 Celery 实例被命名为 celery
,你将用新的 sub[=18] 替换 celery
的 Task 属性=]:
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
我有一个 Flask 应用程序,这是一个非常基本的应用程序,带有 POST 处理程序和一些数据库插入。使用 Celery 将数据库插入设置为任务。如果我将我的 Celery 实例创建和任务定义放在 tasks.py
文件中,并从我的 main.py
文件(也有 Flask 应用程序创建)调用函数,我会得到一个 out of context error
。 tasks.py
文件中的任务依次调用执行数据库插入的数据库 class。如何正确创建 Celery 实例并确保它具有 Flask 上下文?
结构大致如下:
- main.py = Flask 应用程序创建、路由处理和 tasks.delay 调用。
- tasks.py = Celery 实例创建和任务定义。
- DB = 插入。
我希望所有内容都在相同的上下文中工作。
Flask 文档 suggest subclassing Celery 的任务 class 并将任务执行包装在 Flask 应用上下文中。所以在 task.py
中,如果你的 Flask 应用程序实例被命名为 app
而你的 Celery 实例被命名为 celery
,你将用新的 sub[=18] 替换 celery
的 Task 属性=]:
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask