如何使用 Luigi 处理输出

How to handle output with Luigi

我正在尝试了解 luigi 的工作原理,我明白了,但实际实现起来有点困难;) 这就是我所拥有的:

class MyTask(luigi.Task):

    x = luigi.IntParameter()

    def requires(self):
        return OtherTask(self.x)

    def run(self):
        print(self.x)

class OtherTask(luigi.Task):

    x = luigi.IntParameter()

    def run(self):
        y = self.x + 1
        print(y)

这失败了 RuntimeError: Unfulfilled dependency at run time: OtherTask_3_5862334ee2。我认为我需要使用 def output(self): 生成输出来解决这个 issue\feature。而且我无法理解如何在不写入文件的情况下产生合理的输出,比如:

def output(self):
    return luigi.LocalTarget('words.txt')

def run(self):

    words = [
            'apple',
            'banana',
            'grapefruit'
            ]

    with self.output().open('w') as f:
        for word in words:
            f.write('{word}\n'.format(word=word))

我已经尝试阅读文档,但我根本无法理解输出背后的概念。如果我只需要输出到屏幕怎么办。如果我需要将对象输出到另一个任务怎么办?谢谢!

What if I need to output an object to another task?

Luigi 任务可以 运行 在不同的进程中。因此,如果你想交换一个结果的对象,你通常必须写入磁盘、数据库、泡菜或一些允许在进程之间交换数据的外部机制(并且可以验证其存在)一个任务。

与编写需要目标的 output() 方法相反,您还可以覆盖 complete() 方法,您可以在其中编写允许任务被视为完成的任何自定义逻辑。