为 WrapperTask 重试 .complete()
Retry .complete() for WrapperTask
我正在使用 Luigi 来 运行 几个任务,然后我需要将输出批量传输到标准化的文件位置。我写了一个 WrapperTask,用一个重写的 complete()
方法来做到这一点:
from luigi.task import flatten
class TaskX(luigi.WrapperTask):
date = luigi.DateParameter()
client = luigi.s3.S3Client()
def requires(self):
yield TaskA(date=self.date)
yield TaskB(date=self.date)
def complete(self):
tasks_complete = all(r.complete() for r in flatten(self.requires()))
## at the end of everything, batch copy the files
if tasks_complete:
self.client.copy('current-old', 'current')
return True
else:
return False
if __name__ == "__main__":
luigi.run()
但我无法在进程实际完成时调用 complete()
的条件部分。
我认为这是因为 asynchronous behavior 其他人指出的,但我不确定如何解决它。
我已经尝试 运行ning Luigi 使用这些命令行参数:
$ PYTHONPATH="" luigi --module x TaskX --worker-retry-external-task
但这似乎无法正常工作。这是处理此类任务的正确方法吗?
此外,我很好奇 — 有没有人使用过 --worker-retry-external-task
命令?我有点难以理解。
def _is_external(task):
return task.run is None or task.run == NotImplemented
调用 以确定 LuigiTask 是否具有 run()
方法,而 WrapperTask
则没有。因此,我希望 --retry-external-task
标志为此重试 complete()
直到它完成,从而执行该操作。然而,只是在解释器中玩耍让我相信:
>>> import luigi_newsletter_process
>>> task = luigi_newsletter_process.Newsletter()
>>> task.run
<bound method Newsletter.run of Newsletter(date=2016-06-22, use_s3=True)>
>>> task.run()
>>> task.run == None
False
>>> task.run() == None
True
此代码片段未按预期运行。
我在基地外吗?
我仍然认为覆盖 .complete()
理论上应该能够做到这一点,我仍然不确定为什么不能,但如果您只是在寻找批量传输的方法运行 一个过程之后的文件,一个可行的解决方案是在 .run()
方法中进行传输:
def run(self):
logger.info('transferring into current directory')
self.client.copy('current-old','current')
我正在使用 Luigi 来 运行 几个任务,然后我需要将输出批量传输到标准化的文件位置。我写了一个 WrapperTask,用一个重写的 complete()
方法来做到这一点:
from luigi.task import flatten
class TaskX(luigi.WrapperTask):
date = luigi.DateParameter()
client = luigi.s3.S3Client()
def requires(self):
yield TaskA(date=self.date)
yield TaskB(date=self.date)
def complete(self):
tasks_complete = all(r.complete() for r in flatten(self.requires()))
## at the end of everything, batch copy the files
if tasks_complete:
self.client.copy('current-old', 'current')
return True
else:
return False
if __name__ == "__main__":
luigi.run()
但我无法在进程实际完成时调用 complete()
的条件部分。
我认为这是因为 asynchronous behavior 其他人指出的,但我不确定如何解决它。
我已经尝试 运行ning Luigi 使用这些命令行参数:
$ PYTHONPATH="" luigi --module x TaskX --worker-retry-external-task
但这似乎无法正常工作。这是处理此类任务的正确方法吗?
此外,我很好奇 — 有没有人使用过 --worker-retry-external-task
命令?我有点难以理解。
def _is_external(task):
return task.run is None or task.run == NotImplemented
调用 以确定 LuigiTask 是否具有 run()
方法,而 WrapperTask
则没有。因此,我希望 --retry-external-task
标志为此重试 complete()
直到它完成,从而执行该操作。然而,只是在解释器中玩耍让我相信:
>>> import luigi_newsletter_process
>>> task = luigi_newsletter_process.Newsletter()
>>> task.run
<bound method Newsletter.run of Newsletter(date=2016-06-22, use_s3=True)>
>>> task.run()
>>> task.run == None
False
>>> task.run() == None
True
此代码片段未按预期运行。
我在基地外吗?
我仍然认为覆盖 .complete()
理论上应该能够做到这一点,我仍然不确定为什么不能,但如果您只是在寻找批量传输的方法运行 一个过程之后的文件,一个可行的解决方案是在 .run()
方法中进行传输:
def run(self):
logger.info('transferring into current directory')
self.client.copy('current-old','current')