Luigi 任务方法执行顺序
Luigi task methods execution order
Luigi 执行方法的顺序是什么(运行,输出,要求)。我知道要求是 运行 作为检查任务 DAG 有效性的第一个检查,但不应该在 运行()?
之后输出 运行
我实际上正在尝试等待 运行 中的 kafka 消息,并基于该消息触发一系列其他任务和 return LocalTarget。像这样:
def run(self):
for message in self.consumer:
self.metadata_key = str(message.value, 'utf-8')
self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id)
if not os.path.exists(self.path):
os.mkdir(self.path)
with self.conn.cursor() as cursor:
all_accounts = cursor.execute('select domainname from tblaccountinfo;')
for each in all_accounts:
open(os.path.join(self.path,each)).close()
def output(self):
return LocalTarget(self.path)
但是,我收到一条错误消息:
Exception: path or is_tmp must be set
在 return LocalTarget(self.path) 行。为什么 luigi 尝试执行 def output() 方法直到 def 运行() 完成?
当你运行一个管道(即一个或多个任务)时,Luigi首先检查它的输出目标是否已经存在,如果不存在,则将任务调度到运行。
Luigi 如何知道它必须检查哪些目标?它只会让他们调用您任务的 output()
方法。
不是执行顺序。 Luigi 将在使任务进入挂起状态之前检查我们要使用 output() 方法创建的文件是否存在。因此,如果您正在使用任何变量,它希望解决这些变量。在这里,您使用的是 self.path,它是在 运行 方法中创建的。这就是错误的原因。
您要么必须在 class 本身中创建路径并在输出方法中使用,要么在输出方法本身中创建它们并在 运行 方法中使用它们,如下所示
self.output().open('w').close()
Luigi 执行方法的顺序是什么(运行,输出,要求)。我知道要求是 运行 作为检查任务 DAG 有效性的第一个检查,但不应该在 运行()?
之后输出 运行我实际上正在尝试等待 运行 中的 kafka 消息,并基于该消息触发一系列其他任务和 return LocalTarget。像这样:
def run(self):
for message in self.consumer:
self.metadata_key = str(message.value, 'utf-8')
self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id)
if not os.path.exists(self.path):
os.mkdir(self.path)
with self.conn.cursor() as cursor:
all_accounts = cursor.execute('select domainname from tblaccountinfo;')
for each in all_accounts:
open(os.path.join(self.path,each)).close()
def output(self):
return LocalTarget(self.path)
但是,我收到一条错误消息:
Exception: path or is_tmp must be set
在 return LocalTarget(self.path) 行。为什么 luigi 尝试执行 def output() 方法直到 def 运行() 完成?
当你运行一个管道(即一个或多个任务)时,Luigi首先检查它的输出目标是否已经存在,如果不存在,则将任务调度到运行。
Luigi 如何知道它必须检查哪些目标?它只会让他们调用您任务的 output()
方法。
不是执行顺序。 Luigi 将在使任务进入挂起状态之前检查我们要使用 output() 方法创建的文件是否存在。因此,如果您正在使用任何变量,它希望解决这些变量。在这里,您使用的是 self.path,它是在 运行 方法中创建的。这就是错误的原因。
您要么必须在 class 本身中创建路径并在输出方法中使用,要么在输出方法本身中创建它们并在 运行 方法中使用它们,如下所示
self.output().open('w').close()