以 DAG 方式调度作业

Scheduling jobs in a DAG-manner

我们有一个包含不同类型工作的系统。让我们称他们为:

job_1
job_2
job_3

它们都需要不同的参数集(和可选参数)。 IE。我们 运行 job_1(x) 不同 x= A, B, C ...job_2 运行 一组参数取决于 job_1(x) 的结果,并且 job_2 加载 job_A(x) 存储的数据。等等。

结果是依赖关系的树结构。现在,这些工作偶尔会因为某种原因而失败。因此,如果 job_A for x=B 失败,树的分支将完全失败并且不应该 运行。不过,所有其他分支机构都应该 运行。

所有作业均以 Python 编写并使用并行性(基于产生 SLURM 作业)。它们是用 cron 安排的。这显然不是很好并且有两个主要缺点:

为了解决这个问题,我们正在寻找用于调度或可视化的气流,因为它是用 Python 编写的,它似乎大致符合我们的需求。不过,我看到了不同的挑战:

airflow 是一个不错的选择吗?我知道还有其他一些(luigi、Azkaban 等)与 Hadoop 堆栈有些相关(我们没有使用它,因为它不是大数据)。需要多少黑客攻击?多少黑客行为是明智的?

我真的不能为airflow说话,但我可以为luigi说话。

路易吉简介: Luigi 是为数据流和数据依赖而设计的,就像 airflow 一样,但它是在 Spotify 开发的。数据流中的每个步骤都表示为继承自 luigi.Task 的 class,我们将每个步骤称为任务。每个任务都由三个主要函数组成,并且也有参数声明。三个函数及其说明:

  1. 要求:在此函数中,您通过返回这些任务来指定手头的任务依赖于哪些任务。
  2. 输出:在此函数中,您通过返回 class Luigi.LocalTarget(或类似但用于远程)的对象来指定保存此任务结果的位置。
  3. 运行:在此函数中,您指定任务为 运行 时实际发生的情况。

注意:luigi 中央调度程序通过检查文件是否存在来知道任务何时完成 - 特别是在任务的 requires 函数中返回的任务输出函数中指定的文件是 运行。

Can we filter failed jobs, and just look at their dependencies?

Luigi 记录传递给每个任务的所有参数以及对每个任务 运行 的每次尝试。默认情况下,luigi 不保存日志,但您可以轻松设置它。去年夏天我做了一个很大的 luigi 管道,我让它保存了日志。然后它使用模糊字符串比较(使用 Levenshtein 库)来删除重复行并大量压缩日志,然后基本上搜索单词 "error",如果出现,它会发送一封电子邮件给我里面的压缩日志。

We have parallelism within the job (and we need it otherwise the jobs run for more than a day, and we want to rerun the whole lot every day) does that screw up our scheduling?

I 运行 内部具有并行性的任务没有问题。但是,某些库可能会导致问题,例如gensim.

We want to change our jobs and data management as little as possible.

您通常可以将大部分计算粘贴到 luigi 任务的 运行 函数中。

Can we implement the rule system of what jobs to spawn next in a easily understandable way?

我相信是的,是的。对于每个任务,您在任务的 requires 函数中指定它所依赖的任务。

另外需要考虑的是文档。 Luigi 的文档非常好,但它并没有尽可能地流行起来。 Luigi 的社区不是很大,也不是非常活跃。据我所知,Airflow 具有相当的可比性,但它较新,因此目前可能有一个更活跃的社区。

Here 是 luigi 作者的博客 post,其中对 luigi 和较新的替代品进行了一些简要比较。他的结论是:他们都很烂。包括路易吉。