当任务依赖项过期时,luigi 可以重新运行任务吗?

Can luigi rerun tasks when the task dependencies become out of date?

据我所知,luigi.Target可以存在,也可以不存在。 因此,如果 luigi.Target 存在,则不会重新计算。

我正在寻找一种方法来强制重新计算任务,如果它的一个依赖项被修改,或者如果其中一个任务的代码发生变化。

实现目标的一种方法是覆盖 complete(...) 方法。

The documentation for complete is straightforward.

只需实现一个函数来检查您的约束,如果您想重新计算任务,returns False

例如,要在更新依赖项时强制重新计算,您可以这样做:

def complete(self):
    """Flag this task as incomplete if any requirement is incomplete or has been updated more recently than this task"""
    import os
    import time

    def mtime(path):
        return time.ctime(os.path.getmtime(path))

    # assuming 1 output
    if not os.path.exists(self.output().path):
        return False

    self_mtime = mtime(self.output().path) 

    # the below assumes a list of requirements, each with a list of outputs. YMMV
    for el in self.requires():
        if not el.complete():
            return False
        for output in el.output():
            if mtime(output.path) > self_mtime:
                return False

    return True

这将 return False 当任何需求不完整或任何需求比当前任务最近修改或当前任务的输出不存在时。

检测代码何时更改更难。您可以使用类似的方案(检查 mtime),但除非每个任务都有自己的文件,否则它会成败。

由于具有覆盖 complete 的能力,可以实现您想要重新计算的任何逻辑。如果你想要一个特定的 complete 方法来完成许多任务,我建议子 classing luigi.Task,在那里实现你的自定义 complete,然后从亚class.

我来晚了,但这里有一个 mixin 改进了接受的答案以支持多个输入/输出文件。

class MTimeMixin:
    """
        Mixin that flags a task as incomplete if any requirement
        is incomplete or has been updated more recently than this task
        This is based on  but extends
        it to support multiple input / output dependencies.
    """

    def complete(self):
        def to_list(obj):
            if type(obj) in (type(()), type([])):
                return obj
            else:
                return [obj]

        def mtime(path):
            return time.ctime(os.path.getmtime(path))

        if not all(os.path.exists(out.path) for out in to_list(self.output())):
            return False

        self_mtime = min(mtime(out.path) for out in to_list(self.output()))

        # the below assumes a list of requirements, each with a list of outputs. YMMV
        for el in to_list(self.requires()):
            if not el.complete():
                return False
            for output in to_list(el.output()):
                if mtime(output.path) > self_mtime:
                    return False

        return True

要使用它,您只需声明您的 class 使用,例如 class MyTask(Mixin, luigi.Task).

上面的代码对我来说效果很好,除了我认为正确的时间戳比较 mtime(path) 必须 return 浮点数而不是字符串 ("Sat " > "Mon "。 ..[原文如此])。因此,

def mtime(path):
    return os.path.getmtime(path)

而不是:

def mtime(path):
    return time.ctime(os.path.getmtime(path))

关于下面 Shilad Sen posted 的 Mixin 建议,考虑这个例子:

# Filename: run_luigi.py
import luigi
from MTimeMixin import MTimeMixin

class PrintNumbers(luigi.Task):

    def requires(self):
        wreturn []

    def output(self):
        return luigi.LocalTarget("numbers_up_to_10.txt")

    def run(self):
        with self.output().open('w') as f:
            for i in range(1, 11):
                f.write("{}\n".format(i))

class SquaredNumbers(MTimeMixin, luigi.Task):

    def requires(self):
        return [PrintNumbers()]

    def output(self):
        return luigi.LocalTarget("squares.txt")

    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))

if __name__ == '__main__':
    luigi.run()

其中 MTimeMixin 与上面的 post 相同。我运行任务一次使用

luigi --module run_luigi SquaredNumbers

然后我再次触摸文件 numbers_up_to_10.txt 和 运行 任务。然后 Luigi 给出了以下抱怨:

  File "c:\winpython-64bit-3.4.4.6qt5\python-3.4.4.amd64\lib\site-packages\luigi-2.7.1-py3.4.egg\luigi\local_target.py", line 40, in move_to_final_destination
    os.rename(self.tmp_path, self.path)
FileExistsError: [WinError 183] Cannot create a file when that file already exists: 'squares.txt-luigi-tmp-5391104487' -> 'squares.txt'

这可能只是一个 Windows 问题,而不是 Linux 上的问题,其中 "mv a b" 可能只是删除旧的 b 如果它已经存在并且没有写保护。我们可以使用 Luigi/local_target.py:

的以下补丁修复此问题
def move_to_final_destination(self):
    if os.path.exists(self.path):
        os.rename(self.path, self.path + time.strftime("_%Y%m%d%H%M%S.txt"))
    os.rename(self.tmp_path, self.path)

为了完整起见,这里再次将 Mixin 作为一个单独的文件,与其他 post:

import os

class MTimeMixin:
    """
        Mixin that flags a task as incomplete if any requirement
        is incomplete or has been updated more recently than this task
        This is based on  but extends
        it to support multiple input / output dependencies.
    """

    def complete(self):
        def to_list(obj):
            if type(obj) in (type(()), type([])):
                return obj
            else:
                return [obj]

        def mtime(path):
            return os.path.getmtime(path)

        if not all(os.path.exists(out.path) for out in to_list(self.output())):
            return False

        self_mtime = min(mtime(out.path) for out in to_list(self.output()))

        # the below assumes a list of requirements, each with a list of outputs. YMMV
        for el in to_list(self.requires()):
            if not el.complete():
                return False
            for output in to_list(el.output()):
                if mtime(output.path) > self_mtime:
                    return False

        return True