是否可以在 Luigi 管道的两个不同位置重用任务?
Is it possible to reuse a task in two different spots in a Luigi pipeline?
我正在开发一个 Luigi 管道,运行 遇到了一个大问题。似乎没有任何方法可以在同一管道中重用任务。
为了说明我所追求的,请考虑以下工作流程:
Collect_Data → Clean_data → Task_on_data_A → Task_on_data_B
↳ Sample_data → Task_on_data_A → Task_on_data_B
我对 Clean_data
和 Sample_data
执行相同的操作。我想在遵循 DRY 原则的同时在 Luigi 中做这样的事情。现在看来我必须从字面上复制并粘贴我的任务 A 和 B 代码才能执行此操作。
Airflow 有 a way to deal with situations like this. Luigi 有吗?
我认为你应该把箭头画成相反方向的依赖箭头。以这种方式考虑它会有所帮助,因为执行路径由 requires 方法确定。在您的场景中,它非常简单,因为您只有一个具有条件依赖性的 TaskA。
您只需要在开始时再添加一个任务,该任务取决于两个 TaskB 和一个指示 taskb 是否要 运行 样本
伪代码:
ParentTask
def requires():
return [TaskB(sample_type=True),
TaskB(sample_type=False)]
TaskB
def requires():
return TaskA(sample_type)
TaskA
def requires():
if sample_type:
return SampleTask()
else:
return CleanTask()
SampleTask
def requires():
return CleanTask()
CleanTask
def requires()
return CollectData()
我正在开发一个 Luigi 管道,运行 遇到了一个大问题。似乎没有任何方法可以在同一管道中重用任务。 为了说明我所追求的,请考虑以下工作流程:
Collect_Data → Clean_data → Task_on_data_A → Task_on_data_B
↳ Sample_data → Task_on_data_A → Task_on_data_B
我对 Clean_data
和 Sample_data
执行相同的操作。我想在遵循 DRY 原则的同时在 Luigi 中做这样的事情。现在看来我必须从字面上复制并粘贴我的任务 A 和 B 代码才能执行此操作。
Airflow 有 a way to deal with situations like this. Luigi 有吗?
我认为你应该把箭头画成相反方向的依赖箭头。以这种方式考虑它会有所帮助,因为执行路径由 requires 方法确定。在您的场景中,它非常简单,因为您只有一个具有条件依赖性的 TaskA。
您只需要在开始时再添加一个任务,该任务取决于两个 TaskB 和一个指示 taskb 是否要 运行 样本
伪代码:
ParentTask
def requires():
return [TaskB(sample_type=True),
TaskB(sample_type=False)]
TaskB
def requires():
return TaskA(sample_type)
TaskA
def requires():
if sample_type:
return SampleTask()
else:
return CleanTask()
SampleTask
def requires():
return CleanTask()
CleanTask
def requires()
return CollectData()