Celery - 运行 一组具有复杂依赖关系的任务

Celery - running a set of tasks with complex dependencies

在我正在处理的应用程序中,用户可以执行由 "steps" 组成的 "transition"。一个步骤可以对其他步骤有任意数量的依赖关系。我希望能够调用转换并让步骤作为单独的 Celery 任务并行执行。

理想情况下,我想要类似于 celery-tasktree 的东西,除了一般的有向无环图,而不仅仅是树,但目前看来还不存在这样的库。

想到的第一个解决方案是标准拓扑排序的并行改编 - 我们不是确定满足依赖关系的步骤的线性排序,而是确定可以并行执行的整个步骤集开头,然后是可以在第 2 轮中执行的整套步骤,依此类推。

但是,当任务花费的时间可变并且工作人员不得不空闲等待更长的 运行ning 任务时,这不是最佳选择,而现在有任务已准备好 运行。 (对于我的具体应用,这个解决方案目前可能没问题,但我仍然想弄清楚如何优化它。)

https://cs.stackexchange.com/questions/2524/getting-parallel-items-in-dependency-resolution 所述,更好的方法是直接在 DAG 之外操作 - 在每个任务完成后,检查它的任何依赖任务现在是否能够 运行,如果可以, 运行 他们。

实施此类项目的最佳方式是什么?我不清楚是否有一种简单的方法可以做到这一点。

据我所知,Celery 的 group/chain/chord 基元不够灵活,无法让我表达完整的 DAG - 虽然我在这里可能是错的?

我想我可以为任务创建一个包装器,在当前任务完成后通知相关任务 - 我不确定处理此类通知的最佳方式是什么。访问应用程序的 Django 数据库并不是特别简洁,并且很难将其分解为通用库,但 Celery 本身并没有为此提供明显的机制。

我也遇到了这个问题,但我真的找不到更好的解决方案或库,除了一个库,对于仍然感兴趣的任何人,您可以查看 https://github.com/selinon/selinon。虽然它只适用于 python 3,但它似乎是唯一能完全按照你的要求做的事情。

Airflow 是另一种选择,但 airflow 与其他 dag 库一样用于更静态的环境中。