Luigi (2.6.1) 因工作分配不均导致工人早死

Workers dying early due to uneven work distribution in Luigi (2.6.1)

我们正在尝试 运行 一个分布在 docker swarm 集群上的简单管道。 luigi worker 被部署为复制的 docker 服务。他们成功启动,在向 luigi-server 请求工作几秒钟后,由于没有分配给他们工作,他们开始死亡,所有任务最终都分配给了一个工人。

我们必须在 luigi.cfg 的工人中设置 keep_alive=True 以强制他们不要死亡,但在管道完成后让工人留在身边似乎不是一个好主意。 有没有办法控制工作分配?

我们的测试管道:

class RunAllTasks(luigi.Task):

    tasks = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    def requires(self):
        for i in range(self.tasks):
            yield RunExampleTask(i, self.sleep_time)

    def run(self):
        with self.output().open('w') as f:
            f.write('All done!')

    def output(self):
        return LocalTarget('/data/RunAllTasks.txt')


class RunExampleTask(luigi.Task):

    number = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    @property
    def cmd(self):
        return """
               docker run --rm --name example_{number} hello-world
           """.format(number=self.number)

    def run(self):
        time.sleep(self.sleep_time)
        logger.debug(self.cmd)
        out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True)
        logger.debug(out)
        with self.output().open('w') as f:
            f.write(str(out))

    def output(self):
        return LocalTarget('/data/{number}.txt'.format(number=self.number))


if __name__ == "__main__":
    luigi.run()

您的问题是 yield 一次满足一个需求的结果,而不是您想要一次 yield 所有需求,如下所示:

def requires(self):
    reqs = []
    for i in range(self.tasks):
        reqs.append(RunExampleTask(i, self.sleep_time))
    yield reqs