为 WrapperTask 重试 .complete()

Retry .complete() for WrapperTask

我正在使用 Luigi 来 运行 几个任务,然后我需要将输出批量传输到标准化的文件位置。我写了一个 WrapperTask,用一个重写的 complete() 方法来做到这一点:

from luigi.task import flatten

class TaskX(luigi.WrapperTask):
    date = luigi.DateParameter()
    client = luigi.s3.S3Client()

    def requires(self):
        yield TaskA(date=self.date)     
        yield TaskB(date=self.date)

    def complete(self):
        tasks_complete = all(r.complete() for r in flatten(self.requires())) 

        ## at the end of everything, batch copy the files
        if tasks_complete:
            self.client.copy('current-old', 'current')
            return True
        else:
            return False


if __name__ == "__main__":
    luigi.run()

但我无法在进程实际完成时调用 complete() 的条件部分。

我认为这是因为 asynchronous behavior 其他人指出的,但我不确定如何解决它。

我已经尝试 运行ning Luigi 使用这些命令行参数:

$ PYTHONPATH="" luigi --module x TaskX --worker-retry-external-task

但这似乎无法正常工作。这是处理此类任务的正确方法吗?

此外,我很好奇 — 有没有人使用过 --worker-retry-external-task 命令?我有点难以理解。

source code,

def _is_external(task):
    return task.run is None or task.run == NotImplemented
调用

以确定 LuigiTask 是否具有 run() 方法,而 WrapperTask 则没有。因此,我希望 --retry-external-task 标志为此重试 complete() 直到它完成,从而执行该操作。然而,只是在解释器中玩耍让我相信:

>>> import luigi_newsletter_process
>>> task = luigi_newsletter_process.Newsletter()
>>> task.run
    <bound method Newsletter.run of Newsletter(date=2016-06-22, use_s3=True)>
>>> task.run()
>>> task.run == None
False
>>> task.run() == None
True

此代码片段未按预期运行。

我在基地外吗?

我仍然认为覆盖 .complete() 理论上应该能够做到这一点,我仍然不确定为什么不能,但如果您只是在寻找批量传输的方法运行 一个过程之后的文件,一个可行的解决方案是在 .run() 方法中进行传输:

def run(self):
    logger.info('transferring into current directory')
    self.client.copy('current-old','current')