如何忽略在另一个任务的 运行() 中触发的 Luigi 任务失败
How to ignore failures on Luigi tasks triggered inside another task's run()
考虑以下任务:
import luigi
class YieldFailTaskInBatches(luigi.Task):
def run(self):
for i in range(5):
yield [
FailTask(i, j)
for j in range(2)
]
class YieldAllFailTasksAtOnce(luigi.Task):
def run(self):
yield [
FailTask(i, j)
for j in range(2)
for i in range(5)
]
class FailTask(luigi.Task):
i = luigi.IntParameter()
j = luigi.IntParameter()
def run(self):
print("i: %d, j: %d" % (self.i, self.j))
if self.j > 0:
raise Exception("i: %d, j: %d" % (self.i, self.j))
如果 j > 0
,FailTask
失败。 YieldFailTaskInBatches
在 for 循环中多次生成 FailTask
,而 YieldAllFailTasksAtOnce
生成数组中的所有任务。
如果我 运行 YieldFailTaskInBatches
,Luigi 运行s 任务在第一个循环中产生,并且当其中一个失败时 (i = 0, j = 1
),Luigi 不会剩下的就不交了。如果我 运行 YieldAllFailTasksAtOnce
,Luigi 运行 按预期执行所有任务。
我的问题是:如何让 Luigi 将剩余的任务 运行 保留在 YieldFailTasksInBatches
上,即使某些任务失败了?有可能吗?
我问的原因是我有大约 40 万个任务要触发。我不想一次触发它们,因为这会让 Luigi 花费太多时间来构建每个任务的要求(它们可以有 1 到 400 个要求。我目前的解决方案是分批生成它们,一次生成几个,但是如果其中任何一个失败,任务就会停止,其余的不会生成。
好像this issue实现了就可以解决这个问题,但是我想知道有没有别的办法。
这非常骇人听闻,但它应该可以满足您的要求:
class YieldAll(luigi.Task):
def run(self):
errors = list()
for i in range(5):
for j in range(2):
try:
FailTask(i, j).run()
except Exception as e:
errors.append(e)
if errors:
raise ValueError(f' all traceback: {errors}')
class FailTask(luigi.Task):
i = luigi.IntParameter()
j = luigi.IntParameter()
def run(self):
print("i: %d, j: %d" % (self.i, self.j))
if self.j > 0:
raise Exception("i: %d, j: %d" % (self.i, self.j))
所以基本上你是在 luigi 上下文之外 运行ning 任务。除非你输出一个目标,否则 luigi 永远不会知道任务是否有 运行 或没有。
luigi 唯一知道的任务是 YieldAll。如果任何 YieldAll 创建错误,代码将捕获它并将 YieldAll 任务设置为失败状态。
考虑以下任务:
import luigi
class YieldFailTaskInBatches(luigi.Task):
def run(self):
for i in range(5):
yield [
FailTask(i, j)
for j in range(2)
]
class YieldAllFailTasksAtOnce(luigi.Task):
def run(self):
yield [
FailTask(i, j)
for j in range(2)
for i in range(5)
]
class FailTask(luigi.Task):
i = luigi.IntParameter()
j = luigi.IntParameter()
def run(self):
print("i: %d, j: %d" % (self.i, self.j))
if self.j > 0:
raise Exception("i: %d, j: %d" % (self.i, self.j))
如果 j > 0
,FailTask
失败。 YieldFailTaskInBatches
在 for 循环中多次生成 FailTask
,而 YieldAllFailTasksAtOnce
生成数组中的所有任务。
如果我 运行 YieldFailTaskInBatches
,Luigi 运行s 任务在第一个循环中产生,并且当其中一个失败时 (i = 0, j = 1
),Luigi 不会剩下的就不交了。如果我 运行 YieldAllFailTasksAtOnce
,Luigi 运行 按预期执行所有任务。
我的问题是:如何让 Luigi 将剩余的任务 运行 保留在 YieldFailTasksInBatches
上,即使某些任务失败了?有可能吗?
我问的原因是我有大约 40 万个任务要触发。我不想一次触发它们,因为这会让 Luigi 花费太多时间来构建每个任务的要求(它们可以有 1 到 400 个要求。我目前的解决方案是分批生成它们,一次生成几个,但是如果其中任何一个失败,任务就会停止,其余的不会生成。
好像this issue实现了就可以解决这个问题,但是我想知道有没有别的办法。
这非常骇人听闻,但它应该可以满足您的要求:
class YieldAll(luigi.Task):
def run(self):
errors = list()
for i in range(5):
for j in range(2):
try:
FailTask(i, j).run()
except Exception as e:
errors.append(e)
if errors:
raise ValueError(f' all traceback: {errors}')
class FailTask(luigi.Task):
i = luigi.IntParameter()
j = luigi.IntParameter()
def run(self):
print("i: %d, j: %d" % (self.i, self.j))
if self.j > 0:
raise Exception("i: %d, j: %d" % (self.i, self.j))
所以基本上你是在 luigi 上下文之外 运行ning 任务。除非你输出一个目标,否则 luigi 永远不会知道任务是否有 运行 或没有。
luigi 唯一知道的任务是 YieldAll。如果任何 YieldAll 创建错误,代码将捕获它并将 YieldAll 任务设置为失败状态。