luigi: 重新运行 任务用于调试目的?

luigi: re-running tasks for debugging purposes?

我正在编写一些 luigi 工作流程,并且正在尝试调试这些任务。为此,我需要用相同的参数一遍又一遍地重新运行这些任务,直到它们最终完成我想要的。

我知道 luigi 任务是幂等的,因此在给予与之前相同的输入时通常不会重新 运行,而这正是 之后所需要的 工作流已调试并投入生产。然而,在开发过程中,使用完全相同的输入和输出重新运行工作流是有用的——而且,我认为,这是必要的。

我知道我可以在开发期间的每个任务中将 complete() 方法重写为 return False。但是,这会使任务处于未完成状态。

我正在寻找一种方法,以某种 "development" 或 "debug" 模式将我的工作流程设置为 运行,这样我就可以 运行 并重新运行 它一遍又一遍地完成,即使所有任务 运行 都正确,直到我确定工作流程完全按照我的要求进行。

在 luigi 中有什么方法可以做到这一点吗?

提前谢谢你。

================稍后添加================

根据我在下面的评论,似乎更改任务的输入参数不会导致它重新 运行。仅当其 output() 方法 return 具有唯一值时,该任务才能重新 运行 启用。这似乎违背了 "idempotent" 的定义,因为更改输入参数应该将真正的幂等任务视为一个新的、唯一的实体,而不管它是否碰巧 return 与另一个调用相同的输出不同的输入参数。

下面的代码说明了这个问题。 "x"参数决定了output()方法returns的文件名,而"y"参数是在输出内容内部使用的,而不是输出文件名.

如果我用“--x 10 --y 20”然后“--x 10 --y 30”调用我的工作流,第二次调用不会导致任何一个任务被重新运行。我认为这是不正确的行为。但是,如果我用“--x 10 --y 20”后跟“--x 11 --y 20”来调用工作流,那么这两个任务确实会重新 运行.

#!/usr/bin/python3                                                                                                              
# -*- python -*-                                                                                                                

import luigi

class Child(luigi.Task):

    x = luigi.Parameter()
    y = luigi.Parameter()

    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget("child_{}.txt".format(self.x))

    def run(self):
        with self.output().open('w') as f:
            f.write('{} {}\n'.format(self.x, self.y))

class Parent(luigi.Task):

    x = luigi.Parameter()
    y = luigi.Parameter()

    def requires(self):
        return [ Child(self.x, self.y) ]

    def output(self):
        return luigi.LocalTarget("parent_{}.txt".format(self.x))

    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                fout.write("from command line: --x {} --y {}, from child: {}\n".format(self.x, self.y, line.strip()))

if __name__ == '__main__':
    luigi.run()

如您所说,调试模式会很棒。但我认为 Luigi 没有那样的东西。

您可以做的一个技巧是在任务调用 complete() 方法之前删除目标,如 here 所示。您的任务必须是此 class 的子 class,因此您可以在执行前使用 --force 参数重置它。

请注意,此解决方案仅在您的任务将本地文件作为输出时才有效。您必须自定义它才能删除 S3 keys/buckets、数据库表或行等。