如何将多个参数传递给 Luigi 子任务?
How do you pass multiple arguments to a Luigi subtask?
我有一个 Luigi 任务 requires
一个子任务。子任务取决于父任务(即执行 require
ing 的任务)传递的参数。我知道你可以通过设置来指定子任务可以使用的参数...
def requires(self):
return subTask(some_parameter)
...然后在子任务上,通过设置接收参数...
x = luigi.Parameter()
虽然这似乎只允许您传递一个参数。通过任意数量的参数发送我想要的任何类型的最佳方法是什么?我真的想要这样的东西:
class parentTask(luigi.Task):
def requires(self):
return subTask({'file_postfix': 'foo',
'file_content': 'bar'
})
def run(self):
return
class subTask(luigi.Task):
params = luigi.DictParameter()
def output(self):
return luigi.LocalTarget("file_{}.csv".format(self.params['file_postfix']))
def run(self):
with self.output().open('w') as f:
f.write(self.params['file_content'])
如你所见,我尝试使用 luigi.DictParameter
而不是直接的 luigi.Parameter
但是当我 运行 上述时,我从 Luigi 深处的某个地方得到 TypeError: unhashable type: 'dict'
。
运行 Python 2.7.11,路易吉 2.1.1
好的,所以我发现这在 python 3.5 中按预期工作(并且问题在 3.4 中仍然存在)。
今天没时间追根究底,所以没有进一步的细节。
What is the best way to send through an arbitrary number of
parameters, of whatever types I want?
最好的方法是使用命名参数,例如
#in requires
return MySampleSubTask(x=local_x, y=local_y)
class MySampleSubTask(luigi.Task):
x = luigi.Parameter()
y = luigi.Parameter()
What is the best way to send through an arbitrary number of
parameters, of whatever types I want?
你可以按照这个例子。您将有一个占位符来定义传递所需的所有参数(ParameterCollector)。如果在许多子任务的情况下需要将参数传递给子任务,这将避免在每个单任务上定义参数。
class ParameterCollector(object):
param1 = luigi.Parameter()
param2 = luigi.Parameter()
def collect_params(self):
return {'param1': self.param1, 'param2': self.param2}
class TaskB(ParameterCollector, luigi.Task):
def requires(self):
return []
def output(self):
return luigi.LocalTarget('/tmp/task1_success')
def run(self):
with self.output().open('w') as f:
f.write(self.param1)
class TaskA(ParameterCollector, luigi.Task):
def requires(self):
a = TaskB(**self.collect_params())
print(a)
return a
def output(self):
return luigi.LocalTarget('/tmp/task2_success')
def run(self):
with self.output().open('w') as f:
f.write(str([self.param1, self.param2]))
if __name__ == '__main__':
luigi.run()
您可以使用以下内容。
class DownloadFile(luigi.Task):
"""This is file download task used to get
the file from file parser"""
id = luigi.Parameter(default=uuid.uuid4().__str__(), positional=True)
#master_task_id = luigi.Parameter(positional=True)
pipeline_data = luigi.DictParameter(default={}, significant=False,
visibility=luigi.parameter.ParameterVisibility.PRIVATE,
positional=True)
def output(self):
# add the file output here
return MockTarget("FileDownload", mirror_on_stderr=True)
def run(self):
time.sleep(3)
parsed_data = self.pipeline_data['pipeline_data'] #this helps in parsing the dict
我有一个 Luigi 任务 requires
一个子任务。子任务取决于父任务(即执行 require
ing 的任务)传递的参数。我知道你可以通过设置来指定子任务可以使用的参数...
def requires(self):
return subTask(some_parameter)
...然后在子任务上,通过设置接收参数...
x = luigi.Parameter()
虽然这似乎只允许您传递一个参数。通过任意数量的参数发送我想要的任何类型的最佳方法是什么?我真的想要这样的东西:
class parentTask(luigi.Task):
def requires(self):
return subTask({'file_postfix': 'foo',
'file_content': 'bar'
})
def run(self):
return
class subTask(luigi.Task):
params = luigi.DictParameter()
def output(self):
return luigi.LocalTarget("file_{}.csv".format(self.params['file_postfix']))
def run(self):
with self.output().open('w') as f:
f.write(self.params['file_content'])
如你所见,我尝试使用 luigi.DictParameter
而不是直接的 luigi.Parameter
但是当我 运行 上述时,我从 Luigi 深处的某个地方得到 TypeError: unhashable type: 'dict'
。
运行 Python 2.7.11,路易吉 2.1.1
好的,所以我发现这在 python 3.5 中按预期工作(并且问题在 3.4 中仍然存在)。
今天没时间追根究底,所以没有进一步的细节。
What is the best way to send through an arbitrary number of parameters, of whatever types I want?
最好的方法是使用命名参数,例如
#in requires
return MySampleSubTask(x=local_x, y=local_y)
class MySampleSubTask(luigi.Task):
x = luigi.Parameter()
y = luigi.Parameter()
What is the best way to send through an arbitrary number of parameters, of whatever types I want?
你可以按照这个例子。您将有一个占位符来定义传递所需的所有参数(ParameterCollector)。如果在许多子任务的情况下需要将参数传递给子任务,这将避免在每个单任务上定义参数。
class ParameterCollector(object):
param1 = luigi.Parameter()
param2 = luigi.Parameter()
def collect_params(self):
return {'param1': self.param1, 'param2': self.param2}
class TaskB(ParameterCollector, luigi.Task):
def requires(self):
return []
def output(self):
return luigi.LocalTarget('/tmp/task1_success')
def run(self):
with self.output().open('w') as f:
f.write(self.param1)
class TaskA(ParameterCollector, luigi.Task):
def requires(self):
a = TaskB(**self.collect_params())
print(a)
return a
def output(self):
return luigi.LocalTarget('/tmp/task2_success')
def run(self):
with self.output().open('w') as f:
f.write(str([self.param1, self.param2]))
if __name__ == '__main__':
luigi.run()
您可以使用以下内容。
class DownloadFile(luigi.Task):
"""This is file download task used to get
the file from file parser"""
id = luigi.Parameter(default=uuid.uuid4().__str__(), positional=True)
#master_task_id = luigi.Parameter(positional=True)
pipeline_data = luigi.DictParameter(default={}, significant=False,
visibility=luigi.parameter.ParameterVisibility.PRIVATE,
positional=True)
def output(self):
# add the file output here
return MockTarget("FileDownload", mirror_on_stderr=True)
def run(self):
time.sleep(3)
parsed_data = self.pipeline_data['pipeline_data'] #this helps in parsing the dict