如何 运行 并行 Luigi Pipeline 实例:Pid 设置已经 运行ning
How to run parallel instances of a Luigi Pipeline : Pid set already running
我有一个简单的管道。
我想用 ID 2381 启动它一次,然后当第一个工作是 运行ning 我想用 ID 231 启动第二个 运行。第一个 运行 按预期完成。
第二个运行returns这个回复
Pid(s) set([10362]) already running
Process finished with exit code 0
我正在这样开始 运行
运行一个:
luigi.run(
cmdline_args=["--id='newId13822'", "--TaskTwo-id=2381"],
main_task_cls=TaskTwo()
)
运行二:
luigi.run(
cmdline_args=["--id='newId1322'", "--TaskTwo-id=231"],
main_task_cls=TaskTwo()
)
每个任务都有一个由 luigi 的 task_id_str(...) 方法生成的唯一 ID。为什么当 luigi.paramater、TaskTwo-id 和 MockTarget 文件都不同时,luigi 认为任务已经 运行ning?
管道代码:
import time
import uuid
from luigi.mock import MockTarget
import luigi
class TaskOne(luigi.Task):
run_id = luigi.Parameter()
def output(self):
return MockTarget("TaskOne{0}".format(self.run_id), mirror_on_stderr=True)
def run(self):
_out = self.output().open('w')
time.sleep(10)
_out.write(u"Hello World!\n")
_out.close()
class TaskTwo(luigi.Task):
id = luigi.Parameter(default=uuid.uuid4().__str__())
def output(self):
return MockTarget("TaskTwo{0}".format(self.id), mirror_on_stderr=True)
def requires(self):
return TaskOne(self.id)
def run(self):
_out = self.output().open('w')
time.sleep(10)
_out.write(u"Hello World!\n")
_out.close()
看起来这可能是因为您没有连接到调度程序服务器,所以它试图启动一个调度程序进程两次。你是 运行ning luigid 吗?
我能够在命令行中将您的代码 运行 如下所示。首先,我创建了一个目录并将您的代码放在一个名为 luigitest.py 的文件中(减去 luigi.run() 命令)。我将目录更改为我创建的目录。然后我运行:
luigid --background --pidfile ./luigid.pid --logdir . --state-path .
然后我在同一目录中打开了第二个终端。在第一个中我 运行:
PYTHONPATH=. luigi --module luigitest TaskOne --run-id newId13822 --TaskTwo-id 2381 --local-scheduler
第二个我运行(大约一秒后):
PYTHONPATH=. luigi --module luigitest TaskOne --run-id newId13823 --TaskTwo-id 2382 --local-scheduler
这两个输出"Hello World!"
我有一个简单的管道。
我想用 ID 2381 启动它一次,然后当第一个工作是 运行ning 我想用 ID 231 启动第二个 运行。第一个 运行 按预期完成。
第二个运行returns这个回复
Pid(s) set([10362]) already running
Process finished with exit code 0
我正在这样开始 运行
运行一个:
luigi.run(
cmdline_args=["--id='newId13822'", "--TaskTwo-id=2381"],
main_task_cls=TaskTwo()
)
运行二:
luigi.run(
cmdline_args=["--id='newId1322'", "--TaskTwo-id=231"],
main_task_cls=TaskTwo()
)
每个任务都有一个由 luigi 的 task_id_str(...) 方法生成的唯一 ID。为什么当 luigi.paramater、TaskTwo-id 和 MockTarget 文件都不同时,luigi 认为任务已经 运行ning?
管道代码:
import time
import uuid
from luigi.mock import MockTarget
import luigi
class TaskOne(luigi.Task):
run_id = luigi.Parameter()
def output(self):
return MockTarget("TaskOne{0}".format(self.run_id), mirror_on_stderr=True)
def run(self):
_out = self.output().open('w')
time.sleep(10)
_out.write(u"Hello World!\n")
_out.close()
class TaskTwo(luigi.Task):
id = luigi.Parameter(default=uuid.uuid4().__str__())
def output(self):
return MockTarget("TaskTwo{0}".format(self.id), mirror_on_stderr=True)
def requires(self):
return TaskOne(self.id)
def run(self):
_out = self.output().open('w')
time.sleep(10)
_out.write(u"Hello World!\n")
_out.close()
看起来这可能是因为您没有连接到调度程序服务器,所以它试图启动一个调度程序进程两次。你是 运行ning luigid 吗?
我能够在命令行中将您的代码 运行 如下所示。首先,我创建了一个目录并将您的代码放在一个名为 luigitest.py 的文件中(减去 luigi.run() 命令)。我将目录更改为我创建的目录。然后我运行:
luigid --background --pidfile ./luigid.pid --logdir . --state-path .
然后我在同一目录中打开了第二个终端。在第一个中我 运行:
PYTHONPATH=. luigi --module luigitest TaskOne --run-id newId13822 --TaskTwo-id 2381 --local-scheduler
第二个我运行(大约一秒后):
PYTHONPATH=. luigi --module luigitest TaskOne --run-id newId13823 --TaskTwo-id 2382 --local-scheduler
这两个输出"Hello World!"