如何将多个参数传递给 Luigi 子任务?

How do you pass multiple arguments to a Luigi subtask?

我有一个 Luigi 任务 requires 一个子任务。子任务取决于父任务(即执行 requireing 的任务)传递的参数。我知道你可以通过设置来指定子任务可以使用的参数...

def requires(self):
    return subTask(some_parameter)

...然后在子任务上,通过设置接收参数...

x = luigi.Parameter()

虽然这似乎只允许您传递一个参数。通过任意数量的参数发送我想要的任何类型的最佳方法是什么?我真的想要这样的东西:

class parentTask(luigi.Task):

    def requires(self):
        return subTask({'file_postfix': 'foo',
                        'file_content': 'bar'
        })

    def run(self):
        return


class subTask(luigi.Task):
    params = luigi.DictParameter()

    def output(self):
        return luigi.LocalTarget("file_{}.csv".format(self.params['file_postfix']))

    def run(self):
        with self.output().open('w') as f:
            f.write(self.params['file_content'])

如你所见,我尝试使用 luigi.DictParameter 而不是直接的 luigi.Parameter 但是当我 运行 上述时,我从 Luigi 深处的某个地方得到 TypeError: unhashable type: 'dict'

运行 Python 2.7.11,路易吉 2.1.1

好的,所以我发现这在 python 3.5 中按预期工作(并且问题在 3.4 中仍然存在)。

今天没时间追根究底,所以没有进一步的细节。

What is the best way to send through an arbitrary number of parameters, of whatever types I want?

最好的方法是使用命名参数,例如

#in requires
return MySampleSubTask(x=local_x, y=local_y)

class MySampleSubTask(luigi.Task):
    x = luigi.Parameter()
    y = luigi.Parameter()

What is the best way to send through an arbitrary number of parameters, of whatever types I want?

你可以按照这个例子。您将有一个占位符来定义传递所需的所有参数(ParameterCollector)。如果在许多子任务的情况下需要将参数传递给子任务,这将避免在每个单任务上定义参数。

class ParameterCollector(object):
    param1 = luigi.Parameter()
    param2 = luigi.Parameter()

    def collect_params(self):
        return {'param1': self.param1, 'param2': self.param2}


class TaskB(ParameterCollector, luigi.Task):
    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget('/tmp/task1_success')

    def run(self):
        with self.output().open('w') as f:
            f.write(self.param1)


class TaskA(ParameterCollector, luigi.Task):
    def requires(self):
        a = TaskB(**self.collect_params())
        print(a)
        return a

    def output(self):
        return luigi.LocalTarget('/tmp/task2_success')

    def run(self):
        with self.output().open('w') as f:
            f.write(str([self.param1, self.param2]))


if __name__ == '__main__':
    luigi.run()

您可以使用以下内容。

class DownloadFile(luigi.Task):
    """This is file download task used to get
     the file from file parser"""
    id = luigi.Parameter(default=uuid.uuid4().__str__(), positional=True)
    #master_task_id = luigi.Parameter(positional=True)
    pipeline_data = luigi.DictParameter(default={}, significant=False,
                                    
    visibility=luigi.parameter.ParameterVisibility.PRIVATE,
                                    positional=True)
def output(self):
    # add the file output here
    return MockTarget("FileDownload", mirror_on_stderr=True)

def run(self):
    time.sleep(3)
    parsed_data = self.pipeline_data['pipeline_data'] #this helps in parsing the dict