使用 Celery worker 与 SQLAlchemy DB 交互,包括从请求中了解用户
Using a Celery worker to interact with a SQLAlchemy DB, including knowing the user from the request
我对此进行了大量研究,包括尝试像 this 这样的答案。 Celery 似乎无法访问我的 Flask 应用程序的上下文。
我非常了解我的 celery 对象,它将装饰我的任务,必须能够访问我的 Flask 应用程序的上下文。我确实相信它应该,因为我按照 this 指南创建了我的芹菜对象。我不确定我使用 Flask-HTTPAuth 是否存在混淆。
这是我的一些资料。
def make_celery(app):
celery = Celery(app.import_name, backend=app.config["CELERY_RESULT_BACKEND"], broker=app.config["CELERY_BROKER_URL"])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
auth = HTTPBasicAuth()
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///flask_app.db"
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379"
app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379"
celery = make_celery(app)
db = SQLAlchemy(app)
@celery.task(bind=True, name="flask_app.item_loop")
def loop(self):
items = g.user.items
for item in items:
print(item)
运行 不过,这项使用 Flask 的任务是不行的。我尝试通过点击服务器来启动此功能(在授权的情况下!)。
@app.route("/item_loop")
@auth.login_required
def item_loop():
result = loop.delay()
return "It's running."
但是 Celery worker 告诉我任务 raised unexpected: AttributeError("'_AppCtxGlobals' object has no attribute 'user'",)
,我相信这意味着,如前所述,我的 celery 对象没有应用程序上下文,即使我使用了推荐的工厂模式。
要从任务执行中检索用户,您可以尝试传递 User 对象(如果 celery 可以腌制它),或者传递足够的信息以使任务可以检索 User 对象(例如用户的 id)。在后一种情况下,您的任务看起来像
@celery.task(bind=True, name="flask_app.item_loop")
def loop(self, user_id):
user = User.query.get(user_id)
items = user.items
for item in items:
print(item)
你会通过
开始(假设你使用的是flask_login)
result = loop.delay(current_user.id)
正如@Dave W. Smith 所指出的,与其依赖 g
来检索用户,将用户信息作为参数传递给 Celery 任务可能是更好的方法。根据Flask documentation on app context,g
的生命周期是一个请求。由于 Celery 任务是异步执行的,因此它将在与您定义用户的请求中不同的应用程序上下文中执行。
虽然 Dave 和 Greg 的回答中的推荐是有效的,但他们没有强调的是您对在 Celery 任务中使用应用程序上下文存在误解。
您有一个 Flask 应用程序,您在其中使用 Flask-HTTPAuth。您可能有一个 verify_password
处理程序将 g.user
设置为经过身份验证的用户。这意味着当您处理请求时,您可以作为 g.user
访问用户。这一切都很好。
您还有一个或多个 Celery worker,它们是独立的进程,与 Flask 服务器没有直接连接。 Flask 服务器和 Celery 工作进程之间的唯一通信发生在您使用的消息代理(通常是 Redis 或 RabbitMQ)上。
根据您的需要,Celery 工作人员可能需要访问 Flask 应用程序。这在使用将配置存储在 app.config
字典中的 Flask 扩展时非常常见。需要这个的两个常见扩展是 Flask-SQLAlchemy 和 Flask-Mail。如果无法访问 app.config
,Celery 任务将无法打开与数据库的连接或发送电子邮件,因为它不知道数据库 and/or 电子邮件服务器的详细信息。
为了让 Celery 工作人员能够访问配置,公认的做法是在每个工作人员中创建重复的 Flask 应用程序。这些是辅助应用程序,它们与主 Flask 服务器使用的实际应用程序对象没有任何联系。它们的唯一目的是保存原始 app.config
字典的副本,您的任务或您的任务正在使用的任何 Flask 扩展都可以访问该字典。
因此,期望 Flask 服务器中的 g.user
集在 Celery 任务中也可以作为 g.user
访问是无效的,因为它们是不同的 g
对象,来自不同的应用程序实例。
如果您需要在 Celery 任务中使用经过身份验证的用户,您应该做的是将 user_id
(通常是 g.user.id
)作为参数传递给您的任务。然后在您的任务中,您可以使用此 id
从数据库加载用户。希望这对您有所帮助!
我对此进行了大量研究,包括尝试像 this 这样的答案。 Celery 似乎无法访问我的 Flask 应用程序的上下文。
我非常了解我的 celery 对象,它将装饰我的任务,必须能够访问我的 Flask 应用程序的上下文。我确实相信它应该,因为我按照 this 指南创建了我的芹菜对象。我不确定我使用 Flask-HTTPAuth 是否存在混淆。
这是我的一些资料。
def make_celery(app):
celery = Celery(app.import_name, backend=app.config["CELERY_RESULT_BACKEND"], broker=app.config["CELERY_BROKER_URL"])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
auth = HTTPBasicAuth()
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///flask_app.db"
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379"
app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379"
celery = make_celery(app)
db = SQLAlchemy(app)
@celery.task(bind=True, name="flask_app.item_loop")
def loop(self):
items = g.user.items
for item in items:
print(item)
运行 不过,这项使用 Flask 的任务是不行的。我尝试通过点击服务器来启动此功能(在授权的情况下!)。
@app.route("/item_loop")
@auth.login_required
def item_loop():
result = loop.delay()
return "It's running."
但是 Celery worker 告诉我任务 raised unexpected: AttributeError("'_AppCtxGlobals' object has no attribute 'user'",)
,我相信这意味着,如前所述,我的 celery 对象没有应用程序上下文,即使我使用了推荐的工厂模式。
要从任务执行中检索用户,您可以尝试传递 User 对象(如果 celery 可以腌制它),或者传递足够的信息以使任务可以检索 User 对象(例如用户的 id)。在后一种情况下,您的任务看起来像
@celery.task(bind=True, name="flask_app.item_loop")
def loop(self, user_id):
user = User.query.get(user_id)
items = user.items
for item in items:
print(item)
你会通过
开始(假设你使用的是flask_login)result = loop.delay(current_user.id)
正如@Dave W. Smith 所指出的,与其依赖 g
来检索用户,将用户信息作为参数传递给 Celery 任务可能是更好的方法。根据Flask documentation on app context,g
的生命周期是一个请求。由于 Celery 任务是异步执行的,因此它将在与您定义用户的请求中不同的应用程序上下文中执行。
虽然 Dave 和 Greg 的回答中的推荐是有效的,但他们没有强调的是您对在 Celery 任务中使用应用程序上下文存在误解。
您有一个 Flask 应用程序,您在其中使用 Flask-HTTPAuth。您可能有一个 verify_password
处理程序将 g.user
设置为经过身份验证的用户。这意味着当您处理请求时,您可以作为 g.user
访问用户。这一切都很好。
您还有一个或多个 Celery worker,它们是独立的进程,与 Flask 服务器没有直接连接。 Flask 服务器和 Celery 工作进程之间的唯一通信发生在您使用的消息代理(通常是 Redis 或 RabbitMQ)上。
根据您的需要,Celery 工作人员可能需要访问 Flask 应用程序。这在使用将配置存储在 app.config
字典中的 Flask 扩展时非常常见。需要这个的两个常见扩展是 Flask-SQLAlchemy 和 Flask-Mail。如果无法访问 app.config
,Celery 任务将无法打开与数据库的连接或发送电子邮件,因为它不知道数据库 and/or 电子邮件服务器的详细信息。
为了让 Celery 工作人员能够访问配置,公认的做法是在每个工作人员中创建重复的 Flask 应用程序。这些是辅助应用程序,它们与主 Flask 服务器使用的实际应用程序对象没有任何联系。它们的唯一目的是保存原始 app.config
字典的副本,您的任务或您的任务正在使用的任何 Flask 扩展都可以访问该字典。
因此,期望 Flask 服务器中的 g.user
集在 Celery 任务中也可以作为 g.user
访问是无效的,因为它们是不同的 g
对象,来自不同的应用程序实例。
如果您需要在 Celery 任务中使用经过身份验证的用户,您应该做的是将 user_id
(通常是 g.user.id
)作为参数传递给您的任务。然后在您的任务中,您可以使用此 id
从数据库加载用户。希望这对您有所帮助!