使动态 Luigi 任务的失败变得非关键
Make failure of a dynamic Luigi task non critical
我有一个 luigi 工作流,它通过 ftp 下载一堆大文件并将它们存放在 s3 上。
我有一个任务读取要下载的文件列表,然后创建一堆实际执行下载的任务
这个想法是这个工作流的结果是一个包含已成功下载列表的文件,任何失败的下载都会在下一个 运行 的第二天重新尝试。
问题是,如果任何下载任务失败,则永远不会创建成功的下载列表。
这是因为动态创建的任务成为创建它们并根据其输出编译列表的主任务的要求。
有没有办法使这些下载任务的失败无关紧要,以便在编译列表时减去失败任务的输出?
下面的示例代码,GetFiles 是我们从命令行调用的任务。
class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()
def run(self):
with self.output().open('w') as output:
WriteFileFromFtp(sourceUrl, output)
def output(self):
client = S3Client()
return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)
@requires(GetListOfFileToDownload)
class GetFiles(luigi.Task):
def run(self):
with self.input().open('r') as fileList:
files = json.load(fileList)
tasks = []
taskOutputs = []
for file in files:
task = DownloadFileFromFtp(sourceUrl=file["ftpUrl"])
tasks.append(task)
taskOutputs.append(task.output())
yield tasks
successfulDownloads = MakeSuccessfulOutputList(taskOutputs)
with self.output().open('w') as out:
json.dump(successfulDownloads, out)
def output(self):
client = S3Client()
return S3Target(path='successfulDownloads.json', client=client)
这个答案可能不正确 - 检查
我已经阅读了几次文档,但没有发现诸如非严重故障之类的迹象。话虽如此,这种行为可以通过覆盖 DownloadFileFromFtp
中的 Task.complete
方法轻松实现,同时仍然能够在 GetFiles.run
.
中使用 DownloadFileFromFtp.output
通过覆盖 return True
,无论下载是否成功,任务 DownloadFileFromFtp
都会成功。
class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()
def run(self):
with self.output().open('w') as output:
WriteFileFromFtp(sourceUrl, output)
def output(self):
client = S3Client()
return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)
def complete(self,):
return True
但是请注意,您还可以在该 complete
方法中使用更复杂的逻辑 - 例如仅当任务在运行时遇到特定网络故障时才会失败。
几年后,您一定找到了答案,但这里有一些东西可以提供帮助。
class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()
def run(self):
with self.output().open('w') as output:
WriteFileFromFtp(sourceUrl, output)
def on_failure(self, exception):
#If the task fails for any reason,
#then just indicate the task as completed.
#From the docs, exception is a string, so you can easily.
if "FileNotFound" in exception:
return self.complete(ignore=True)
return self.complete(ignore=False)
def complete(self, ignore=False):
return ignore
def output(self):
client = S3Client()
return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)
我有一个 luigi 工作流,它通过 ftp 下载一堆大文件并将它们存放在 s3 上。
我有一个任务读取要下载的文件列表,然后创建一堆实际执行下载的任务
这个想法是这个工作流的结果是一个包含已成功下载列表的文件,任何失败的下载都会在下一个 运行 的第二天重新尝试。
问题是,如果任何下载任务失败,则永远不会创建成功的下载列表。
这是因为动态创建的任务成为创建它们并根据其输出编译列表的主任务的要求。
有没有办法使这些下载任务的失败无关紧要,以便在编译列表时减去失败任务的输出?
下面的示例代码,GetFiles 是我们从命令行调用的任务。
class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()
def run(self):
with self.output().open('w') as output:
WriteFileFromFtp(sourceUrl, output)
def output(self):
client = S3Client()
return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)
@requires(GetListOfFileToDownload)
class GetFiles(luigi.Task):
def run(self):
with self.input().open('r') as fileList:
files = json.load(fileList)
tasks = []
taskOutputs = []
for file in files:
task = DownloadFileFromFtp(sourceUrl=file["ftpUrl"])
tasks.append(task)
taskOutputs.append(task.output())
yield tasks
successfulDownloads = MakeSuccessfulOutputList(taskOutputs)
with self.output().open('w') as out:
json.dump(successfulDownloads, out)
def output(self):
client = S3Client()
return S3Target(path='successfulDownloads.json', client=client)
这个答案可能不正确 - 检查
我已经阅读了几次文档,但没有发现诸如非严重故障之类的迹象。话虽如此,这种行为可以通过覆盖 DownloadFileFromFtp
中的 Task.complete
方法轻松实现,同时仍然能够在 GetFiles.run
.
DownloadFileFromFtp.output
通过覆盖 return True
,无论下载是否成功,任务 DownloadFileFromFtp
都会成功。
class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()
def run(self):
with self.output().open('w') as output:
WriteFileFromFtp(sourceUrl, output)
def output(self):
client = S3Client()
return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)
def complete(self,):
return True
但是请注意,您还可以在该 complete
方法中使用更复杂的逻辑 - 例如仅当任务在运行时遇到特定网络故障时才会失败。
几年后,您一定找到了答案,但这里有一些东西可以提供帮助。
class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()
def run(self):
with self.output().open('w') as output:
WriteFileFromFtp(sourceUrl, output)
def on_failure(self, exception):
#If the task fails for any reason,
#then just indicate the task as completed.
#From the docs, exception is a string, so you can easily.
if "FileNotFound" in exception:
return self.complete(ignore=True)
return self.complete(ignore=False)
def complete(self, ignore=False):
return ignore
def output(self):
client = S3Client()
return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)