如何重置 luigi 任务状态?

How to reset luigi task status?

目前,我有一堆 luigi 任务排在一起,有一个简单的依赖链(a -> b -> c -> d)。 d 先执行,a 最后执行。 a 是被触发的任务。

a return 之外的所有目标都是一个 luigi.LocalTarget() 对象,并且有一个通用的 luigi.Parameter() ,它是一个字符串(包含日期和时间)。在 luigi 中央服务器(启用了历史记录)上运行。

问题是,当我重新运行 所述任务 a 时,luigi 检查历史并查看该特定任务之前是否已经 运行,如果它已经DONE 状态,它没有 运行 任务(在这种情况下为 d),我不能这样做,更改字符串无济于事(向其添加随机微秒)。如何强制执行 运行 任务?

首先评论:Luigi 任务是幂等的。如果您 运行 一个具有相同参数值的任务,无论您 运行 它多少次,它都必须始终 return 相同的输出。所以多次 运行 是没有意义的。这让 Luigi 变得强大:如果你有一项大任务需要做很多事情并且需要花费很多时间并且它在某处失败了,你将不得不从头开始 运行 。如果您将其拆分为更小的任务,运行 它失败了,您只需 运行 管道中的其余任务。

当您 运行 一个任务时,Luigi 检查该任务的输出以查看它们是否存在。如果他们不这样做,Luigi 会检查它所依赖的任务的输出。如果它们存在,那么它只会 运行 当前任务并生成输出 Target。如果依赖项输出不存在,那么它将 运行 该任务。

因此,如果您想重新运行 一项任务,您必须删除其 Target 输出。如果你想重新运行整个管道,你必须删除任务在级联中依赖的所有任务的所有输出。

有一个 ongoing discussion in this issue in Luigi repository. Take a look at this comment,因为它会指向一些脚本,用于获取给定任务的输出目标并删除它们。

d6tflow allows you to reset and force rerun of tasks, see details at https://d6tflow.readthedocs.io/en/latest/control.html#manually-forcing-task-reset-and-rerun

# force execution including downstream tasks
d6tflow.run([TaskTrain()],force=[TaskGetData()])

# reset single task
TaskGetData().invalidate()

# reset all downstream task output
d6tflow.invalidate_downstream(TaskGetData(), TaskTrain())

# reset all upstream task input
d6tflow.invalidate_upstream(TaskTrain())

警告:它仅适用于 d6tflow 任务和目标,它们是修改后的本地目标,但不适用于所有 luigi 目标。应该带你走很长的路,并针对数据科学工作流程进行了优化。适用于本地工作人员,尚未在中央服务器上测试。

我通常通过覆盖 complete():

来做到这一点
class BaseTask(luigi.Task):

    force = luigi.BoolParameter()

    def complete(self):
        outputs = luigi.task.flatten(self.output())
        for output in outputs:
            if self.force and output.exists():
                output.remove()
        return all(map(lambda output: output.exists(), outputs))


class MyTask(BaseTask):
    def output(self):
        return luigi.LocalTarget("path/to/done/file.txt")

    def run(self):
        with self.output().open('w') as out_file:
            out_file.write('Complete')

当您 运行 任务时,会按预期创建输出文件。使用 force=True 实例化 class 后,输出文件将仍然存在,直到调用 complete()

task = MyTask()
task.run()
task.complete()
# True

new_task = MyTask(force=True)
new_task.output().exists()
# True
new_task.complete()
# False
new_task.output().exists()
# False

我用它来强制重新生成输出,而无需先将其删除,并允许您 select 重新生成哪些类型。在我们的用例中,我们希望旧生成的文件继续存在,直到它们被新版本重写。

# generation.py
class ForcibleTask(luigi.Task):
    force_task_families = luigi.ListParameter(
        positional=False, significant=False, default=[]
    )

    def complete(self):
        print("{}: check {}".format(self.get_task_family(), self.output().path))
        if not self.output().exists():
            self.oldinode = 0  # so any new file is considered complete
            return False
        curino = pathlib.Path(self.output().path).stat().st_ino
        try:
            x = self.oldinode
        except AttributeError:
            self.oldinode = curino

        if self.get_task_family() in self.force_task_families:
            # only done when file has been overwritten with new file
            return self.oldinode != curino

        return self.output().exists()

用法示例

class Generate(ForcibleTask):
    date = luigi.DateParameter()
    def output(self):
        return luigi.LocalTarget(
            self.date.strftime("generated-%Y-%m-%d")
        )

调用

luigi --module generation Generate '--Generate-force-task-families=["Generate"]'

@cangers BaseTask 的改进,可以在无法删除目标时引发错误。

class BaseTask(luigi.Task):
    force = luigi.BoolParameter(significant=False, default=False)

    def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    if self.force is True:
        outputs = luigi.task.flatten(self.output())
        for out in outputs:
            if out.exists():
                try:
                    out.remove()
                except AttributeError:
                    raise NotImplementedError


      

您可以使用 in-memory 输出存储,它们每次都会被清除。我有一个解决方案,不知道是否适合你的需要。

import uuid
class taskname(luigi.Task):
    id = luigi.Parameter(default=uuid.uuid5(uuid.NAMESPACE_DNS, random().__str__()).__str__(), positional=True) # this helps in getting a new id everytime it is executed.

   def output(self):
    # This is just to ensure the task is complete
      return luigi.mock.MockTarget(f'taskname-{self.id}')
   def run(self):
     #do your process here
     # if your process is successful then run this
     self.output().open('w').close()  #persists the object in memory for the scheduler to understand the task is complete.

我们使用在 class 中创建的 id 在输出中命名模拟目标。因此,中央调度程序无法找到此输出,即使您同时 运行 同一个 DAG 两次。只有当前批次才能在内存中访问它们。