Luigi 任务方法执行顺序

Luigi task methods execution order

Luigi 执行方法的顺序是什么(运行,输出,要求)。我知道要求是 运行 作为检查任务 DAG 有效性的第一个检查,但不应该在 运行()?

之后输出 运行

我实际上正在尝试等待 运行 中的 kafka 消息,并基于该消息触发一系列其他任务和 return LocalTarget。像这样:

def run(self):
    for message in self.consumer:
        self.metadata_key = str(message.value, 'utf-8')
        self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id)
        if not os.path.exists(self.path):
            os.mkdir(self.path)

        with self.conn.cursor() as cursor:
              all_accounts = cursor.execute('select domainname from tblaccountinfo;')
        for each in all_accounts:
            open(os.path.join(self.path,each)).close()

def output(self):
    return LocalTarget(self.path)

但是,我收到一条错误消息:

Exception: path or is_tmp must be set

return LocalTarget(self.path) 行。为什么 luigi 尝试执行 def output() 方法直到 def 运行() 完成?

当你运行一个管道(即一个或多个任务)时,Luigi首先检查它的输出目标是否已经存在,如果不存在,则将任务调度到运行。

Luigi 如何知道它必须检查哪些目标?它只会让他们调用您任务的 output() 方法。

不是执行顺序。 Luigi 将在使任务进入挂起状态之前检查我们要使用 output() 方法创建的文件是否存在。因此,如果您正在使用任何变量,它希望解决这些变量。在这里,您使用的是 self.path,它是在 运行 方法中创建的。这就是错误的原因。

您要么必须在 class 本身中创建路径并在输出方法中使用,要么在输出方法本身中创建它们并在 运行 方法中使用它们,如下所示

self.output().open('w').close()