什么定义了 "complete" Luigi 任务?
What defines a "complete" Luigi task?
我正在构建我的第一个 Luigi 管道,我目前正在构建我的依赖项之前单独测试任务。在测试期间,我正在使用以下主要方法的一个版本来构建任务:
if __name__ == "__main__":
headers = dict()
headers["Content-Type"] = "application/json"
headers["Accept"] = "application/json"
luigi.build[(CSVValidator(jsonfile = '/sample_input/sample_csv.json',
docfile = None,
error_limit = 2,
order_fields = 3,
output_file = 'validation_is_us.txt',
header = headers)])
luigi.run()
这是我的 csv_validator 的样子:
class CSVValidator(luigi.Task):
jsonfile = luigi.Parameter()
docfile = luigi.Parameter()
error_limit = luigi.Parameter()
order_fields = luigi.Parameter()
output_file = luigi.Parameter()
header = luigi.DictParameter()
def output(self):
return luigi.LocalTarget(self.output_file + "/csv_validator_data_%s.txt" % time.time())
def run(self):
output_file = self.output().open('w')
files = {}
data = {}
files["jsonfile"] = open(self.jsonfile, 'rb')
files["docfile"] = open(self.docfile, 'rb')
data["error_limit"] = self.error_limit
data["order_fields"] = self.order_fields
r = requests.post(*****~~~~~*****~~~~~,
headers=headers,
data=data, files=files)
task_response = r.text.encode(encoding="UTF-8")
print type(task_response)
print(task_response)
jsontaskdata = json.loads(task_response)
json.dump(jsontaskdata, output_file)
print("validated")
output_file.close()
然而,这个任务实际上从来没有 运行。相反,luigi 中央调度程序声称此任务已经完成:
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 1 complete ones were encountered:
- 1 CSVValidator(...)
* 1 ran successfully:
- 1 Downloader(...)
这个进度看起来 :) 因为没有失败的任务或缺少依赖项
我创建的其他任务,例如下载器,每次都成功 运行。什么在这里定义了一个完整的任务?没看懂什么意思
感谢您的宝贵时间!
输出方法返回的目标对象定义任务是否完成。
如果某个输出文件已经存在,或者某些其他条件(包括外部资源的可用性),则可能会创建该对象。例如,在 luigi.contrib.esindex.py
中(检查)某个远程集群中 ElasticSearch 索引的存在将创建目标对象并告诉您任务 (CopyIndex) 已完成。
您可能还想看看这个答案:
我正在构建我的第一个 Luigi 管道,我目前正在构建我的依赖项之前单独测试任务。在测试期间,我正在使用以下主要方法的一个版本来构建任务:
if __name__ == "__main__":
headers = dict()
headers["Content-Type"] = "application/json"
headers["Accept"] = "application/json"
luigi.build[(CSVValidator(jsonfile = '/sample_input/sample_csv.json',
docfile = None,
error_limit = 2,
order_fields = 3,
output_file = 'validation_is_us.txt',
header = headers)])
luigi.run()
这是我的 csv_validator 的样子:
class CSVValidator(luigi.Task):
jsonfile = luigi.Parameter()
docfile = luigi.Parameter()
error_limit = luigi.Parameter()
order_fields = luigi.Parameter()
output_file = luigi.Parameter()
header = luigi.DictParameter()
def output(self):
return luigi.LocalTarget(self.output_file + "/csv_validator_data_%s.txt" % time.time())
def run(self):
output_file = self.output().open('w')
files = {}
data = {}
files["jsonfile"] = open(self.jsonfile, 'rb')
files["docfile"] = open(self.docfile, 'rb')
data["error_limit"] = self.error_limit
data["order_fields"] = self.order_fields
r = requests.post(*****~~~~~*****~~~~~,
headers=headers,
data=data, files=files)
task_response = r.text.encode(encoding="UTF-8")
print type(task_response)
print(task_response)
jsontaskdata = json.loads(task_response)
json.dump(jsontaskdata, output_file)
print("validated")
output_file.close()
然而,这个任务实际上从来没有 运行。相反,luigi 中央调度程序声称此任务已经完成:
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 1 complete ones were encountered:
- 1 CSVValidator(...)
* 1 ran successfully:
- 1 Downloader(...)
这个进度看起来 :) 因为没有失败的任务或缺少依赖项
我创建的其他任务,例如下载器,每次都成功 运行。什么在这里定义了一个完整的任务?没看懂什么意思
感谢您的宝贵时间!
输出方法返回的目标对象定义任务是否完成。
如果某个输出文件已经存在,或者某些其他条件(包括外部资源的可用性),则可能会创建该对象。例如,在 luigi.contrib.esindex.py
中(检查)某个远程集群中 ElasticSearch 索引的存在将创建目标对象并告诉您任务 (CopyIndex) 已完成。
您可能还想看看这个答案: