luigi:任务运行其他任务而不创建依赖关系?

luigi: task runs other tasks without creating dependency?

在 luigi 中,我了解到如果一个任务屈服于另一个任务,则第二个任务将成为原始任务的新依赖项,这会导致原始任务在屈服任务之后重新运行完成。

但是,在某些情况下,我希望一个任务可以推迟到另一个任务,而推迟到的任务不会成为依赖项。我想要这个的原因是因为我不希望我当前任务的 run 方法在另一个任务完成后重新 运行。

是的,我知道我的 run 方法应该是幂等的。尽管如此,在某些情况下,我绝对不希望该方法在屈服于其他任务后再次 运行。

我想出了一个方法来做到这一点,但我不确定它是否是最好的解决方案,如果你们有任何建议,我想要一些建议。

假设有两个任务:MainTaskOtherTaskMainTask 使用各种参数通过命令行调用。根据这些参数的设置,MainTask 可能会调用 OtherTask。如果是这样,我不希望 MainTaskrun 方法被第二次调用。

class OtherTask(luigi.Task):
    # Under some circumstances, this task can be invoked
    # from the command line, and it can also be invoked
    # in the normal luigi manner as a dependency for one
    # or more other tasks.
    # It also might be yielded to, as is done in the
    # "run" method for `MainTask`, below.

    id = luigi.parameter.IntParameter()

    def complete(self):
        # ...
        # return True or False depending on various tests.

    def requires(self):
        # return [ ... various dependencies ... ]

    def run(self):
        # do stuff with self.id
        # ...
        with self.output().open('w') as f:
            f.write('OK')

    def output(self):
        return '... something ...'

class MainTask(luigi.Task):
    # Parameters are expected to be supplied on the command line.
    param1 = luigi.parameter.IntParameter()
    param2 = luigi.parameter.BoolParameter()
    # ... etc. ...

    def run(self):
        #
        # Here's how I keep this "run" method from being
        # invoked more than once. Is there a better way
        # to invoke `OtherTask` without having it cause 
        # this current task to be re-invoked?
        if self.complete():
            return

        # Normal "run" processing for this task ...
        # ... etc. ...

        # Possibly run `OtherTask` multiple times, only if
        # certain conditions are met ... 
        tasks = []
        if the_conditions_are_met:
            ids = []
            # Append multiple integer ID's to the `ids` list.
            # Calculate each ID depending upon the values
            # passed in via self.param1, self.param2, etc.
            # Do some processing depending on these ID's.
            # ... etc. ...

            # Then, create a list of tasks to be invoked,
            # each one taking one of these ID's as a parameter.
            for the_id in ids:
                tasks.append(OtherTask(id=the_id))

        with self.output().open('w') as f:
            f.write('OK')

        # Optionally yield after marking this task as 
        # complete, so that when the yielded tasks have
        # all run, this task's "run" method can test for
        # completion and not re-run its logic.
        if tasks:
            yield tasks

    def output(self):
        return '... whatever ...'        

根据我的评论,使用辅助 class 似乎可行。它只会运行一次,即使多次调用主class的run方法,它也只是重用辅助class的输出数据而没有它正在重新计算。

class OtherTask(luigi.Task):
    # Under some circumstances, this task can be invoked
    # from the command line, and it can also be invoked
    # in the normal luigi manner as a dependency for one
    # or more other tasks.
    # It also might be yielded to, as is done in the
    # "run" method for `MainTask`, below.

    id = luigi.parameter.IntParameter()

    def complete(self):
        # ...
        # return True or False depending on various tests.

    def requires(self):
        # return [ ... various dependencies ... ]

    def run(self):
        # do stuff with self.id
        # ...
        with self.output().open('w') as f:
            f.write('OK')

    def output(self):
        return '... something ...'

class AuxiliaryTask(luigi.Task):

    def requires(self):
        # return [ ... various dependencies ... ]

    def run(self):                
        ids = []
        # Append multiple integer ID's to the `ids` list.
        # Calculate each ID depending upon the values
        # passed to this task via its parameters. Then ...
        with self.output().open('w') as f:
            f.write(json.dumps(ids))

    def output(self):
        return '... something else ...' 

class MainTask(luigi.Task):
    # Parameters are expected to be supplied on the command line.
    param1 = luigi.parameter.IntParameter()
    param2 = luigi.parameter.BoolParameter()
    # ... etc. ...

    def requires(self):
        return [ self.clone(AuxiliaryTask) ]

    def run(self):
        # This method could get re-run after the yields,
        # below. However, it just re-reads its input, instead
        # of that input being recalculated. And in the second
        # invocation, luigi's dependency mechanism will prevent
        # any re-yielded-to tasks from repeating what they did
        # before.
        ids = []
        with self.input().open('r') as f:
            ids = json.dumps(f.read())

        if ids:
            tasks = []

            # Create a list of tasks to be invoked,
            # each one taking one of these ID's as a parameter.
            # Then, yield to each of these tasks.
            for the_id in ids:
                tasks.append(OtherTask(id=the_id))
            if tasks:
                yield tasks

        with self.output().open('w') as f:
            f.write('OK')


    def output(self):
        return '... whatever ...'