如何访问由多个任务组成的 celery AsyncResult/Signature 的任务依赖关系图?

How can I access the task dependency graph of a celery AsyncResult/Signature comprised of multiple tasks?

我希望能够提取芹菜的任务依赖关系图 AsyncResult/Signature。我的 AsyncResult/Signature 可能是一个复杂的 chain/group/chord。我想从 parent/children AsyncResults 中提取 task_id 的图形并将其序列化,以便以后可以从 task_id 字符串中重构 AsyncResult .

我怀疑此输出将来自遍历 AsyncResult.childrenAsyncResult.parent 任务树,但想看看 celery 中是否已经存在任何内容,而不必编写我的自己的遍历代码。

我想要一个大致类似于以下内容的输出:

{
    "GroupTask-id-xxx": [
        "Task-id-xxx",
        "Task-id-xxx",
    ]
}

GitHub 上的 this answer 中似乎讨论了很多要使用的工具。

主要工具:

  • Async/GroupResult.as_tuple() 给出了整个依赖图的序列化元组表示。与 celery.result.result_from_tuple(tuple_representation) 结合使用以从元组序列化中重新水合 Async/GroupResult
  • my_group_result.save() 如果将来尝试从单个 ID 访问 GroupResult objects。这会将元组表示保存到后端。与 GroupResult.restore(group_id) 结合使用。请注意,它不会捕获组的 parents 或其 children AsyncResults.

如果您想将这些结果保存到数据库中,以便在未来某个日期可以完全检索它们,只需 id 提供以下方法:

import json
from celery.result import AsyncResult, GroupResult, result_from_tuple

def save_result(result: Union[AsyncResult, GroupResult]) -> None:
    """Save result compute structure to backend"""
    result.backend.set(result.id, json.dumps(result.as_tuple()))


def restore_result(result_id: str) -> Union[AsyncResult, GroupResult]:
    """Restore result (including parents) from backend

    Raises:
        ValueError if result not found in backend
    """
    try:
        return result_from_tuple(json.loads(tasks.celery_app.backend.get(result_id)))
    except TypeError:
        raise ValueError(f"Result id '{result_id}', not found.")