运行 taskA 和 运行 带参数的下一个任务,在 luigi 中返回了 taskA
Run taskA and run next tasks with parameters, that returned taskA in luigi
我有任务生成应处理的文件:
class TaskA(luigi.Task):
def run(self):
# some code which generates list of files into output()
def output(self):
return luigi.LocalTarget(filepath='/path/to/process_these_files.json')
我有包装器任务,应该 运行 TaskA,获取参数,运行 处理带有值的任务,我将其放入 process_these_files.json
class RunAll(luigi.WrapperTask):
def requires(self):
files = json.load(TaskA().open('r'))
for file in files:
yield ProcessFileTask(file=file)
知道怎么做吗?
您可以使用动态依赖项。这些是在运行时已知的依赖项。每次你 yield
动态依赖时, run()
方法将保持直到依赖完成。
例如:
class RunAll(luigi.WrapperTask):
def requires(self):
return TaskA()
def run(self):
files = json.load(self.input().open('r'))
for file in files:
yield ProcessFileTask(file=file)
另见 https://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies
我有任务生成应处理的文件:
class TaskA(luigi.Task):
def run(self):
# some code which generates list of files into output()
def output(self):
return luigi.LocalTarget(filepath='/path/to/process_these_files.json')
我有包装器任务,应该 运行 TaskA,获取参数,运行 处理带有值的任务,我将其放入 process_these_files.json
class RunAll(luigi.WrapperTask):
def requires(self):
files = json.load(TaskA().open('r'))
for file in files:
yield ProcessFileTask(file=file)
知道怎么做吗?
您可以使用动态依赖项。这些是在运行时已知的依赖项。每次你 yield
动态依赖时, run()
方法将保持直到依赖完成。
例如:
class RunAll(luigi.WrapperTask):
def requires(self):
return TaskA()
def run(self):
files = json.load(self.input().open('r'))
for file in files:
yield ProcessFileTask(file=file)
另见 https://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies