如何 运行 一个接一个地完成任务而不需要它
How to run a task after another is complete without requiring it
在 luigi
中,我正在尝试建立一个如下所示的工作流程:
1) 解析数据
2) 对解析后的数据进行计算
3)Tar一起计算数据
这些操作需要按顺序完成,我有几个这样设置的工作流程。然而,即使要求很容易在 1 和 2 之间完成(2 要求 1),我不想明确地要求 3 要求 2,否则我不能在其他工作流程中重复使用该任务。那么,我该怎么做呢?
我知道使用动态依赖项是有效的,但它的预期用途是在您事先不知道依赖项列表的情况下使用,而在这种情况下我知道。它还要求我创建一个 Workflow
任务,按顺序生成任务 2 和 3,而不是仅仅安排它们。
我尝试过的一个可能的解决方案是制作一个可以将任务作为参数的超级 class,但不幸的是,这不起作用,因为 classes 不能作为参数,只能原语+日期。那么,正确的方法是什么?
我在下面包含了当前方法:
class TaskOne(luigi.Task):
def output(self):
return luigi.LocalTarget("...")
def run(self):
with self.output().open('w') as out_file:
// Do parsing
class TaskTwo(luigi.Task):
def requires(self):
return TaskOne()
def output(self):
return luigi.LocalTarget(".../success.txt")
def run(self):
with self.input().open('r') as in_file:
// Do calculations
with self.output().open('w') as out_file:
out_file.write("1")
class TarTask(luigi.Task):
directory = luigi.Parameter()
def output(self):
return luigi.LocalTarget(directory+".tar.xz")
def run(self):
// Tar to temporary tar target then mv file to output location
class Workflow(luigi.Task):
def output(self):
return luigi.LocalTarget(".../wf_success.txt")
def run(self):
yield TaskTwo()
yield TarTask(directory)
with self.output().open('w') as out_file:
out_file.write("1")
所以,我想出了一种方法来解决这个问题。您可以像这样动态设置任务实例的 requires 方法:
from types import MethodType
def sequence_tasks(tasks):
prev_task = None
for task in tasks:
def requires_method(self):
return self.prev_task
task.requires = MethodType(requires_method, task)
setattr(task, "prev_task", prev_task)
prev_task = task
return prev_task
但是,如果序列中的任务确实需要其他任务,则此方法无效。为此,您需要制作一个更复杂的 requires_method
来调用旧需求和 appends/sets 一个属性来添加您的新需求任务。
在 luigi
中,我正在尝试建立一个如下所示的工作流程:
1) 解析数据
2) 对解析后的数据进行计算
3)Tar一起计算数据
这些操作需要按顺序完成,我有几个这样设置的工作流程。然而,即使要求很容易在 1 和 2 之间完成(2 要求 1),我不想明确地要求 3 要求 2,否则我不能在其他工作流程中重复使用该任务。那么,我该怎么做呢?
我知道使用动态依赖项是有效的,但它的预期用途是在您事先不知道依赖项列表的情况下使用,而在这种情况下我知道。它还要求我创建一个 Workflow
任务,按顺序生成任务 2 和 3,而不是仅仅安排它们。
我尝试过的一个可能的解决方案是制作一个可以将任务作为参数的超级 class,但不幸的是,这不起作用,因为 classes 不能作为参数,只能原语+日期。那么,正确的方法是什么?
我在下面包含了当前方法:
class TaskOne(luigi.Task):
def output(self):
return luigi.LocalTarget("...")
def run(self):
with self.output().open('w') as out_file:
// Do parsing
class TaskTwo(luigi.Task):
def requires(self):
return TaskOne()
def output(self):
return luigi.LocalTarget(".../success.txt")
def run(self):
with self.input().open('r') as in_file:
// Do calculations
with self.output().open('w') as out_file:
out_file.write("1")
class TarTask(luigi.Task):
directory = luigi.Parameter()
def output(self):
return luigi.LocalTarget(directory+".tar.xz")
def run(self):
// Tar to temporary tar target then mv file to output location
class Workflow(luigi.Task):
def output(self):
return luigi.LocalTarget(".../wf_success.txt")
def run(self):
yield TaskTwo()
yield TarTask(directory)
with self.output().open('w') as out_file:
out_file.write("1")
所以,我想出了一种方法来解决这个问题。您可以像这样动态设置任务实例的 requires 方法:
from types import MethodType
def sequence_tasks(tasks):
prev_task = None
for task in tasks:
def requires_method(self):
return self.prev_task
task.requires = MethodType(requires_method, task)
setattr(task, "prev_task", prev_task)
prev_task = task
return prev_task
但是,如果序列中的任务确实需要其他任务,则此方法无效。为此,您需要制作一个更复杂的 requires_method
来调用旧需求和 appends/sets 一个属性来添加您的新需求任务。