在luigi中自动实例化?

Automatic instantiate in luigi?

luigi.Task.run中,我们需要将对象序列化为files/database/etc.:

MyTask(luigi.Task):
    param = luigi.Parameter()
    def requires(self):
        AnotherTask(self.param)
    def output(self):
        luigi.LocalTarget('out_{}'.format(self.param))
    def run(self):
        with self.input().open('r') as infile:
            # instantiate incoming data
            indata = pd.read_csv(infile, index=False, parse_date=...)
        # my process
        with self.output().open('w') as outfile:
            # serialize outgoing data
            outdata.to_csv(outfile, index=False, ...)

但为了方便起见,我想跳过 pd.read_csv(...) 代码片段,因为在重用任务时我必须编写相同的实例化步骤。

有没有像这样在 luigi 中自动实例化的方法?:

AnotherTask(luigi.Task):
    param = luigi.Parameter()
    def requires(self):
        ...
    def output(self):
        ...
    def _instantiate(self):
        with self.output().open('r') as outfile:
            outdata = pd.read_csv(outfile, index=False, parse_date=...)
        return outdata

MyTask(luigi.Task):
    param = luigi.Parameter()
    def requires(self):
        AnotherTask(self.param)
    def output(self):
        luigi.LocalTarget('out_{}'.format(self.param))
    def run(self):
        # automatic instantiation via AnotherTask._instantiate()
        indata = self.input()
        # my process
        outdata = indata.someprocess()
        with self.output().open('w') as outfile:
            # serialize outgoing data
            outdata.to_csv(outfile, index=False, ...)

自己回答:

def getinstances(struct):
    if isinstance(struct, luigi.Task):
        return struct.instantiate()
    elif isinstance(struct, dict):
        return {k: getinstances(v) for k, v in six.iteritems(struct)}
    else:
        # Remaining case: assume r is iterable...
        try:
            s = list(struct)
        except TypeError:
            raise Exception('Cannot map %s to Task/dict/list' % str(struct))
    return [getinstances(r) for r in s]

class MyParentTask(luigi.Task):
    def requires(self):...
    def output(self):...
    def run(self):...
    def instantiate(self):
        with self.output().open() as outfile:
            reader = csv.reader(outfile)
            outdata = [row for row in reader]
        return outdata

class MyChildTask(luigi.Task):
    def requires(self):
        return MyParentTask()
    def output(self):...
    def run(self):
        indata = getinstances(self.requires())
        ...