Luigi (2.6.1) 因工作分配不均导致工人早死
Workers dying early due to uneven work distribution in Luigi (2.6.1)
我们正在尝试 运行 一个分布在 docker swarm 集群上的简单管道。 luigi worker 被部署为复制的 docker 服务。他们成功启动,在向 luigi-server 请求工作几秒钟后,由于没有分配给他们工作,他们开始死亡,所有任务最终都分配给了一个工人。
我们必须在 luigi.cfg 的工人中设置 keep_alive=True 以强制他们不要死亡,但在管道完成后让工人留在身边似乎不是一个好主意。
有没有办法控制工作分配?
我们的测试管道:
class RunAllTasks(luigi.Task):
tasks = luigi.IntParameter()
sleep_time = luigi.IntParameter()
def requires(self):
for i in range(self.tasks):
yield RunExampleTask(i, self.sleep_time)
def run(self):
with self.output().open('w') as f:
f.write('All done!')
def output(self):
return LocalTarget('/data/RunAllTasks.txt')
class RunExampleTask(luigi.Task):
number = luigi.IntParameter()
sleep_time = luigi.IntParameter()
@property
def cmd(self):
return """
docker run --rm --name example_{number} hello-world
""".format(number=self.number)
def run(self):
time.sleep(self.sleep_time)
logger.debug(self.cmd)
out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True)
logger.debug(out)
with self.output().open('w') as f:
f.write(str(out))
def output(self):
return LocalTarget('/data/{number}.txt'.format(number=self.number))
if __name__ == "__main__":
luigi.run()
您的问题是 yield
一次满足一个需求的结果,而不是您想要一次 yield
所有需求,如下所示:
def requires(self):
reqs = []
for i in range(self.tasks):
reqs.append(RunExampleTask(i, self.sleep_time))
yield reqs
我们正在尝试 运行 一个分布在 docker swarm 集群上的简单管道。 luigi worker 被部署为复制的 docker 服务。他们成功启动,在向 luigi-server 请求工作几秒钟后,由于没有分配给他们工作,他们开始死亡,所有任务最终都分配给了一个工人。
我们必须在 luigi.cfg 的工人中设置 keep_alive=True 以强制他们不要死亡,但在管道完成后让工人留在身边似乎不是一个好主意。 有没有办法控制工作分配?
我们的测试管道:
class RunAllTasks(luigi.Task):
tasks = luigi.IntParameter()
sleep_time = luigi.IntParameter()
def requires(self):
for i in range(self.tasks):
yield RunExampleTask(i, self.sleep_time)
def run(self):
with self.output().open('w') as f:
f.write('All done!')
def output(self):
return LocalTarget('/data/RunAllTasks.txt')
class RunExampleTask(luigi.Task):
number = luigi.IntParameter()
sleep_time = luigi.IntParameter()
@property
def cmd(self):
return """
docker run --rm --name example_{number} hello-world
""".format(number=self.number)
def run(self):
time.sleep(self.sleep_time)
logger.debug(self.cmd)
out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True)
logger.debug(out)
with self.output().open('w') as f:
f.write(str(out))
def output(self):
return LocalTarget('/data/{number}.txt'.format(number=self.number))
if __name__ == "__main__":
luigi.run()
您的问题是 yield
一次满足一个需求的结果,而不是您想要一次 yield
所有需求,如下所示:
def requires(self):
reqs = []
for i in range(self.tasks):
reqs.append(RunExampleTask(i, self.sleep_time))
yield reqs