如何访问由多个任务组成的 celery AsyncResult/Signature 的任务依赖关系图?
How can I access the task dependency graph of a celery AsyncResult/Signature comprised of multiple tasks?
我希望能够提取芹菜的任务依赖关系图 AsyncResult/Signature
。我的 AsyncResult/Signature
可能是一个复杂的 chain/group/chord。我想从 parent/children AsyncResults
中提取 task_id
的图形并将其序列化,以便以后可以从 task_id
字符串中重构 AsyncResult
.
我怀疑此输出将来自遍历 AsyncResult.children
或 AsyncResult.parent
任务树,但想看看 celery
中是否已经存在任何内容,而不必编写我的自己的遍历代码。
我想要一个大致类似于以下内容的输出:
{
"GroupTask-id-xxx": [
"Task-id-xxx",
"Task-id-xxx",
]
}
GitHub 上的 this answer 中似乎讨论了很多要使用的工具。
主要工具:
Async/GroupResult.as_tuple()
给出了整个依赖图的序列化元组表示。与 celery.result.result_from_tuple(tuple_representation)
结合使用以从元组序列化中重新水合 Async/GroupResult
。
my_group_result.save()
如果将来尝试从单个 ID 访问 GroupResult
objects。这会将元组表示保存到后端。与 GroupResult.restore(group_id)
结合使用。请注意,它不会捕获组的 parents 或其 children AsyncResults
.
如果您想将这些结果保存到数据库中,以便在未来某个日期可以完全检索它们,只需 id
提供以下方法:
import json
from celery.result import AsyncResult, GroupResult, result_from_tuple
def save_result(result: Union[AsyncResult, GroupResult]) -> None:
"""Save result compute structure to backend"""
result.backend.set(result.id, json.dumps(result.as_tuple()))
def restore_result(result_id: str) -> Union[AsyncResult, GroupResult]:
"""Restore result (including parents) from backend
Raises:
ValueError if result not found in backend
"""
try:
return result_from_tuple(json.loads(tasks.celery_app.backend.get(result_id)))
except TypeError:
raise ValueError(f"Result id '{result_id}', not found.")
我希望能够提取芹菜的任务依赖关系图 AsyncResult/Signature
。我的 AsyncResult/Signature
可能是一个复杂的 chain/group/chord。我想从 parent/children AsyncResults
中提取 task_id
的图形并将其序列化,以便以后可以从 task_id
字符串中重构 AsyncResult
.
我怀疑此输出将来自遍历 AsyncResult.children
或 AsyncResult.parent
任务树,但想看看 celery
中是否已经存在任何内容,而不必编写我的自己的遍历代码。
我想要一个大致类似于以下内容的输出:
{
"GroupTask-id-xxx": [
"Task-id-xxx",
"Task-id-xxx",
]
}
GitHub 上的 this answer 中似乎讨论了很多要使用的工具。
主要工具:
Async/GroupResult.as_tuple()
给出了整个依赖图的序列化元组表示。与celery.result.result_from_tuple(tuple_representation)
结合使用以从元组序列化中重新水合Async/GroupResult
。my_group_result.save()
如果将来尝试从单个 ID 访问GroupResult
objects。这会将元组表示保存到后端。与GroupResult.restore(group_id)
结合使用。请注意,它不会捕获组的 parents 或其 childrenAsyncResults
.
如果您想将这些结果保存到数据库中,以便在未来某个日期可以完全检索它们,只需 id
提供以下方法:
import json
from celery.result import AsyncResult, GroupResult, result_from_tuple
def save_result(result: Union[AsyncResult, GroupResult]) -> None:
"""Save result compute structure to backend"""
result.backend.set(result.id, json.dumps(result.as_tuple()))
def restore_result(result_id: str) -> Union[AsyncResult, GroupResult]:
"""Restore result (including parents) from backend
Raises:
ValueError if result not found in backend
"""
try:
return result_from_tuple(json.loads(tasks.celery_app.backend.get(result_id)))
except TypeError:
raise ValueError(f"Result id '{result_id}', not found.")