什么定义了 "complete" Luigi 任务?

What defines a "complete" Luigi task?

我正在构建我的第一个 Luigi 管道,我目前正在构建我的依赖项之前单独测试任务。在测试期间,我正在使用以下主要方法的一个版本来构建任务:

if __name__ == "__main__":

    headers = dict()
    headers["Content-Type"] = "application/json"
    headers["Accept"] = "application/json"

    luigi.build[(CSVValidator(jsonfile = '/sample_input/sample_csv.json',
                docfile = None,
                error_limit = 2,
                order_fields = 3,
                output_file = 'validation_is_us.txt',
                header = headers)])

    luigi.run()

这是我的 csv_validator 的样子:

class CSVValidator(luigi.Task):
    jsonfile = luigi.Parameter()
    docfile = luigi.Parameter()
    error_limit = luigi.Parameter()
    order_fields = luigi.Parameter()
    output_file = luigi.Parameter()
    header = luigi.DictParameter()

    def output(self):
        return luigi.LocalTarget(self.output_file + "/csv_validator_data_%s.txt" % time.time())

    def run(self):
        output_file = self.output().open('w')
        files = {}
        data = {}
        files["jsonfile"] = open(self.jsonfile, 'rb')
        files["docfile"] = open(self.docfile, 'rb')
        data["error_limit"] = self.error_limit
        data["order_fields"] = self.order_fields
        r = requests.post(*****~~~~~*****~~~~~,
                      headers=headers,
                      data=data, files=files)
        task_response = r.text.encode(encoding="UTF-8")
        print type(task_response)
        print(task_response)
        jsontaskdata = json.loads(task_response)
        json.dump(jsontaskdata, output_file)
        print("validated")
        output_file.close()

然而,这个任务实际上从来没有 运行。相反,luigi 中央调度程序声称此任务已经完成:

===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 CSVValidator(...)
* 1 ran successfully:
    - 1 Downloader(...)

这个进度看起来 :) 因为没有失败的任务或缺少依赖项

我创建的其他任务,例如下载器,每次都成功 运行。什么在这里定义了一个完整的任务?没看懂什么意思

感谢您的宝贵时间!

输出方法返回的目标对象定义任务是否完成。

如果某个输出文件已经存在,或者某些其他条件(包括外部资源的可用性),则可能会创建该对象。例如,在 luigi.contrib.esindex.py 中(检查)某个远程集群中 ElasticSearch 索引的存在将创建目标对象并告诉您任务 (CopyIndex) 已完成。

您可能还想看看这个答案:

和这个讨论:https://github.com/spotify/luigi/issues/595