Luigi 任务进入无限循环

Luigi Tasks going into infinite Loops

我有一个简单的 luigi 任务,它在 运行 上会产生一个不同的参数,如下所示。

import luigi

class ComputeJob(luigi.Task):

   id_parameter = luigi.parameter.IntParameter()

    #run defination
    def run(self):


        print ("\nrunning task {}".format(self.id_parameter))
        #Do some work here

        if self.id_parameter < 10: 
            next_val = self.id_parameter + 1
            yield ComputeJob(id_parameter = next_val)

我期待它 运行 10 次然后退出 运行 但在执行第 10 次迭代后,它开始从头开始重新执行 9 个步骤。因此,任务在第 9 步和第 10 步中不断循环。

所以预期的输出应该是:

running task 1
running task 2
running task 3
running task 4
running task 5
running task 6
running task 7
running task 8
running task 9
running task 10

但我得到的输出是:

running task 1
running task 2
running task 3
running task 4
running task 5
running task 6
running task 7
running task 8
running task 9
running task 10
running task 9
running task 10
running task 9
running task 10
...
...
...

我在这里错过了什么?

谢谢,Oyshik

如果您想 'do away with the output all together',我建议您查看方法 complete 的文档,如您在后续评论中所述。

另一种选择是为 运行 这个您创建了任意多次的 ComputeJob 任务创建一个额外的包装器任务。

import luigi

class ComputeJob(luigi.Task):
  id_parameter = luigi.parameter.IntParameter()
  done = False

  #run definition
  def run(self):
    print ("\nrunning task {}".format(self.id_parameter))
    #Do some work here
    self.done = True

  def complete(self):
    if self.done:
      return True
    else:
      return False

class RunComputeJobs(luigi.WrapperTask):
  def requires(self):
    for i in range(1,10):
      yield ComputeJob(id_parameter = i)