使用 SQLAlchemy 读取 Celery 任务的结果时出错

Error in reading result of Celery Task using SQLAlchemy

我有一个 Flask 应用程序,我正在使用 Celery(v5.1.2) 和 SQLAlchemy 来 运行 我的工作人员的后台任务。

这就是我创建 Celery worker 的方式...

backend_url = 'mysql://{}:{}@{}/{}'.format(username, password, hostname, db_name)
broker_url = 'sqla+' + backend_url
db_backend_url = 'db+' + backend_url

celery = Celery(
    app.import_name,
    backend=backend_url,
    broker_url=broker_url,
    result_backend=db_backend_url,
    cache_backend=db_backend_url,
    task_serializer='json',
    result_serializer='json',
    include=['my_blueprints.profile'])

初始化和 运行 我的任务工作正常。当我尝试使用以下方式读取结果状态时出现问题:

bg_task = current_app.celery.AsyncResult(MY_TASK_ID)
bg_task_state = bg_task.state

我在尝试 运行 bg_task.state

时收到错误:_pickle.UnpicklingError: pickle data was truncated

我认为这与任务 returns 一个 1MB 的大文件有关。虽然任务是 运行ning a 可以使用上面的两行成功读取任务状态,但是当任务完成时失败。

task_serializer 和 result_serializer 都设置为 'json' 所以我不明白为什么会这样。

在其默认后端中,celery 使用 BLOB 列将其结果存储在 celery_taskmeta table 中,在 mysql 中限制为 64K。在你的芹菜日志中的某个地方,当结果被写入 table.

时,你可能还会看到来自 mysql 的截断警告

celery 结果并不是真正用于传递大文件,而更多地只是为了保存有关任务结果的一些最小细节。

您没有提供很多关于您的用例的详细信息,但通常将如此大的二进制 blob 写入您的数据库是一种气味,或者至少是等待某天发生的头痛。

一个不错的解决方法是将文件写入文件系统或将其上传到您最喜欢的弹性存储区域,然后 return 任务结果中的文件名。需要注意的是确保您的工作节点可以访问与需要结果的节点相同的文件系统。

您也可以通过更改 celery_taskmeta table 以允许 MEDIUMBLOBLONGBLOB 存储在 table 中,但我的直觉是,您最终会希望您没有将文件存储在 RDBMS 中。您必须确保每次从头开始部署应用程序时都进行了更改。