是否可以在 Luigi 管道的两个不同位置重用任务?

Is it possible to reuse a task in two different spots in a Luigi pipeline?

我正在开发一个 Luigi 管道,运行 遇到了一个大问题。似乎没有任何方法可以在同一管道中重用任务。 为了说明我所追求的,请考虑以下工作流程:

Collect_Data → Clean_data → Task_on_data_A → Task_on_data_B ↳ Sample_data → Task_on_data_A → Task_on_data_B

我对 Clean_dataSample_data 执行相同的操作。我想在遵循 DRY 原则的同时在 Luigi 中做这样的事情。现在看来我必须从字面上复制并粘贴我的任务 A 和 B 代码才能执行此操作。

Airflow 有 a way to deal with situations like this. Luigi 有吗?

我认为你应该把箭头画成相反方向的依赖箭头。以这种方式考虑它会有所帮助,因为执行路径由 requires 方法确定。在您的场景中,它非常简单,因为您只有一个具有条件依赖性的 TaskA。

您只需要在开始时再添加一个任务,该任务取决于两个 TaskB 和一个指示 taskb 是否要 运行 样本

伪代码:

ParentTask
  def requires():
     return [TaskB(sample_type=True), 
             TaskB(sample_type=False)]

TaskB
  def requires():
    return TaskA(sample_type)

TaskA
  def requires():
    if sample_type:
      return SampleTask()
    else:
      return CleanTask()

SampleTask
  def requires():
    return CleanTask()

CleanTask
  def requires()
    return CollectData()