如何使用上游任务的输出来驱动 requires() 的结果?

How to use the output of an upstream task to drive the result of requires()?

在 Luigi 中,我有一个任务,我想根据另一个上游任务的输出动态生成依赖项列表。例如:

class TaskA:
  param = IntParameter()

class TaskB:
  def main(self):
    pass
  def output(self):
    return [1,2,3,4]

class TaskC:
  def requires(self):
    return [TaskB()] + [TaskA(param=p) for p in TaskB().output()]

总而言之,我根据 TaskB 的输出在 TaskC 中创建了一组 TaskA 依赖项。

我已经尝试了一些东西,但似乎 Luigi 感到困惑,因为 TaskB 确实需要 运行 在 TaskC 可以 return 其依赖项列表之前。但显然 Luigi 在调用 TaskC.requires()

之前不能 运行 任何东西

有什么方法可以让这项工作完成我在这里想做的事情吗?

在我的真实场景中,这些任务的实现要复杂得多,但这是任务连接方式的要点。

这是一个很好的问题! Luigi 实际上为此提供了完美的解决方案,并在文档中进行了介绍:https://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies

基本上,您将需要 TaskB 然后 yieldrun 函数中基于输出的新任务。让我举个例子给你看:

class TaskC:
  def requires(self):
    return TaskB()

  def run(self):
    yield [TaskA(param=p) for p in self.input()]

您可以利用将任务 B 的输出存储在临时位置(如 Redis)并在任务 C 中调用它。然后在TaskC中,可以根据需要使用“yield”来创建任务。 yield 可以在 requires() 方法和 运行() 方法中使用。如果在 运行() 方法中使用,则必须处理依赖项及其故障。