Luigi 任务进入无限循环
Luigi Tasks going into infinite Loops
我有一个简单的 luigi 任务,它在 运行 上会产生一个不同的参数,如下所示。
import luigi
class ComputeJob(luigi.Task):
id_parameter = luigi.parameter.IntParameter()
#run defination
def run(self):
print ("\nrunning task {}".format(self.id_parameter))
#Do some work here
if self.id_parameter < 10:
next_val = self.id_parameter + 1
yield ComputeJob(id_parameter = next_val)
我期待它 运行 10 次然后退出 运行 但在执行第 10 次迭代后,它开始从头开始重新执行 9 个步骤。因此,任务在第 9 步和第 10 步中不断循环。
所以预期的输出应该是:
running task 1
running task 2
running task 3
running task 4
running task 5
running task 6
running task 7
running task 8
running task 9
running task 10
但我得到的输出是:
running task 1
running task 2
running task 3
running task 4
running task 5
running task 6
running task 7
running task 8
running task 9
running task 10
running task 9
running task 10
running task 9
running task 10
...
...
...
我在这里错过了什么?
谢谢,Oyshik
如果您想 'do away with the output all together',我建议您查看方法 complete 的文档,如您在后续评论中所述。
另一种选择是为 运行 这个您创建了任意多次的 ComputeJob 任务创建一个额外的包装器任务。
import luigi
class ComputeJob(luigi.Task):
id_parameter = luigi.parameter.IntParameter()
done = False
#run definition
def run(self):
print ("\nrunning task {}".format(self.id_parameter))
#Do some work here
self.done = True
def complete(self):
if self.done:
return True
else:
return False
class RunComputeJobs(luigi.WrapperTask):
def requires(self):
for i in range(1,10):
yield ComputeJob(id_parameter = i)
我有一个简单的 luigi 任务,它在 运行 上会产生一个不同的参数,如下所示。
import luigi
class ComputeJob(luigi.Task):
id_parameter = luigi.parameter.IntParameter()
#run defination
def run(self):
print ("\nrunning task {}".format(self.id_parameter))
#Do some work here
if self.id_parameter < 10:
next_val = self.id_parameter + 1
yield ComputeJob(id_parameter = next_val)
我期待它 运行 10 次然后退出 运行 但在执行第 10 次迭代后,它开始从头开始重新执行 9 个步骤。因此,任务在第 9 步和第 10 步中不断循环。
所以预期的输出应该是:
running task 1
running task 2
running task 3
running task 4
running task 5
running task 6
running task 7
running task 8
running task 9
running task 10
但我得到的输出是:
running task 1
running task 2
running task 3
running task 4
running task 5
running task 6
running task 7
running task 8
running task 9
running task 10
running task 9
running task 10
running task 9
running task 10
...
...
...
我在这里错过了什么?
谢谢,Oyshik
如果您想 'do away with the output all together',我建议您查看方法 complete 的文档,如您在后续评论中所述。
另一种选择是为 运行 这个您创建了任意多次的 ComputeJob 任务创建一个额外的包装器任务。
import luigi
class ComputeJob(luigi.Task):
id_parameter = luigi.parameter.IntParameter()
done = False
#run definition
def run(self):
print ("\nrunning task {}".format(self.id_parameter))
#Do some work here
self.done = True
def complete(self):
if self.done:
return True
else:
return False
class RunComputeJobs(luigi.WrapperTask):
def requires(self):
for i in range(1,10):
yield ComputeJob(id_parameter = i)