luigi:任务运行其他任务而不创建依赖关系?
luigi: task runs other tasks without creating dependency?
在 luigi 中,我了解到如果一个任务屈服于另一个任务,则第二个任务将成为原始任务的新依赖项,这会导致原始任务在屈服任务之后重新运行完成。
但是,在某些情况下,我希望一个任务可以推迟到另一个任务,而推迟到的任务不会成为依赖项。我想要这个的原因是因为我不希望我当前任务的 run
方法在另一个任务完成后重新 运行。
是的,我知道我的 run
方法应该是幂等的。尽管如此,在某些情况下,我绝对不希望该方法在屈服于其他任务后再次 运行。
我想出了一个方法来做到这一点,但我不确定它是否是最好的解决方案,如果你们有任何建议,我想要一些建议。
假设有两个任务:MainTask
和OtherTask
。 MainTask
使用各种参数通过命令行调用。根据这些参数的设置,MainTask
可能会调用 OtherTask
。如果是这样,我不希望 MainTask
的 run
方法被第二次调用。
class OtherTask(luigi.Task):
# Under some circumstances, this task can be invoked
# from the command line, and it can also be invoked
# in the normal luigi manner as a dependency for one
# or more other tasks.
# It also might be yielded to, as is done in the
# "run" method for `MainTask`, below.
id = luigi.parameter.IntParameter()
def complete(self):
# ...
# return True or False depending on various tests.
def requires(self):
# return [ ... various dependencies ... ]
def run(self):
# do stuff with self.id
# ...
with self.output().open('w') as f:
f.write('OK')
def output(self):
return '... something ...'
class MainTask(luigi.Task):
# Parameters are expected to be supplied on the command line.
param1 = luigi.parameter.IntParameter()
param2 = luigi.parameter.BoolParameter()
# ... etc. ...
def run(self):
#
# Here's how I keep this "run" method from being
# invoked more than once. Is there a better way
# to invoke `OtherTask` without having it cause
# this current task to be re-invoked?
if self.complete():
return
# Normal "run" processing for this task ...
# ... etc. ...
# Possibly run `OtherTask` multiple times, only if
# certain conditions are met ...
tasks = []
if the_conditions_are_met:
ids = []
# Append multiple integer ID's to the `ids` list.
# Calculate each ID depending upon the values
# passed in via self.param1, self.param2, etc.
# Do some processing depending on these ID's.
# ... etc. ...
# Then, create a list of tasks to be invoked,
# each one taking one of these ID's as a parameter.
for the_id in ids:
tasks.append(OtherTask(id=the_id))
with self.output().open('w') as f:
f.write('OK')
# Optionally yield after marking this task as
# complete, so that when the yielded tasks have
# all run, this task's "run" method can test for
# completion and not re-run its logic.
if tasks:
yield tasks
def output(self):
return '... whatever ...'
根据我的评论,使用辅助 class 似乎可行。它只会运行一次,即使多次调用主class的run
方法,它也只是重用辅助class的输出数据而没有它正在重新计算。
class OtherTask(luigi.Task):
# Under some circumstances, this task can be invoked
# from the command line, and it can also be invoked
# in the normal luigi manner as a dependency for one
# or more other tasks.
# It also might be yielded to, as is done in the
# "run" method for `MainTask`, below.
id = luigi.parameter.IntParameter()
def complete(self):
# ...
# return True or False depending on various tests.
def requires(self):
# return [ ... various dependencies ... ]
def run(self):
# do stuff with self.id
# ...
with self.output().open('w') as f:
f.write('OK')
def output(self):
return '... something ...'
class AuxiliaryTask(luigi.Task):
def requires(self):
# return [ ... various dependencies ... ]
def run(self):
ids = []
# Append multiple integer ID's to the `ids` list.
# Calculate each ID depending upon the values
# passed to this task via its parameters. Then ...
with self.output().open('w') as f:
f.write(json.dumps(ids))
def output(self):
return '... something else ...'
class MainTask(luigi.Task):
# Parameters are expected to be supplied on the command line.
param1 = luigi.parameter.IntParameter()
param2 = luigi.parameter.BoolParameter()
# ... etc. ...
def requires(self):
return [ self.clone(AuxiliaryTask) ]
def run(self):
# This method could get re-run after the yields,
# below. However, it just re-reads its input, instead
# of that input being recalculated. And in the second
# invocation, luigi's dependency mechanism will prevent
# any re-yielded-to tasks from repeating what they did
# before.
ids = []
with self.input().open('r') as f:
ids = json.dumps(f.read())
if ids:
tasks = []
# Create a list of tasks to be invoked,
# each one taking one of these ID's as a parameter.
# Then, yield to each of these tasks.
for the_id in ids:
tasks.append(OtherTask(id=the_id))
if tasks:
yield tasks
with self.output().open('w') as f:
f.write('OK')
def output(self):
return '... whatever ...'
在 luigi 中,我了解到如果一个任务屈服于另一个任务,则第二个任务将成为原始任务的新依赖项,这会导致原始任务在屈服任务之后重新运行完成。
但是,在某些情况下,我希望一个任务可以推迟到另一个任务,而推迟到的任务不会成为依赖项。我想要这个的原因是因为我不希望我当前任务的 run
方法在另一个任务完成后重新 运行。
是的,我知道我的 run
方法应该是幂等的。尽管如此,在某些情况下,我绝对不希望该方法在屈服于其他任务后再次 运行。
我想出了一个方法来做到这一点,但我不确定它是否是最好的解决方案,如果你们有任何建议,我想要一些建议。
假设有两个任务:MainTask
和OtherTask
。 MainTask
使用各种参数通过命令行调用。根据这些参数的设置,MainTask
可能会调用 OtherTask
。如果是这样,我不希望 MainTask
的 run
方法被第二次调用。
class OtherTask(luigi.Task):
# Under some circumstances, this task can be invoked
# from the command line, and it can also be invoked
# in the normal luigi manner as a dependency for one
# or more other tasks.
# It also might be yielded to, as is done in the
# "run" method for `MainTask`, below.
id = luigi.parameter.IntParameter()
def complete(self):
# ...
# return True or False depending on various tests.
def requires(self):
# return [ ... various dependencies ... ]
def run(self):
# do stuff with self.id
# ...
with self.output().open('w') as f:
f.write('OK')
def output(self):
return '... something ...'
class MainTask(luigi.Task):
# Parameters are expected to be supplied on the command line.
param1 = luigi.parameter.IntParameter()
param2 = luigi.parameter.BoolParameter()
# ... etc. ...
def run(self):
#
# Here's how I keep this "run" method from being
# invoked more than once. Is there a better way
# to invoke `OtherTask` without having it cause
# this current task to be re-invoked?
if self.complete():
return
# Normal "run" processing for this task ...
# ... etc. ...
# Possibly run `OtherTask` multiple times, only if
# certain conditions are met ...
tasks = []
if the_conditions_are_met:
ids = []
# Append multiple integer ID's to the `ids` list.
# Calculate each ID depending upon the values
# passed in via self.param1, self.param2, etc.
# Do some processing depending on these ID's.
# ... etc. ...
# Then, create a list of tasks to be invoked,
# each one taking one of these ID's as a parameter.
for the_id in ids:
tasks.append(OtherTask(id=the_id))
with self.output().open('w') as f:
f.write('OK')
# Optionally yield after marking this task as
# complete, so that when the yielded tasks have
# all run, this task's "run" method can test for
# completion and not re-run its logic.
if tasks:
yield tasks
def output(self):
return '... whatever ...'
根据我的评论,使用辅助 class 似乎可行。它只会运行一次,即使多次调用主class的run
方法,它也只是重用辅助class的输出数据而没有它正在重新计算。
class OtherTask(luigi.Task):
# Under some circumstances, this task can be invoked
# from the command line, and it can also be invoked
# in the normal luigi manner as a dependency for one
# or more other tasks.
# It also might be yielded to, as is done in the
# "run" method for `MainTask`, below.
id = luigi.parameter.IntParameter()
def complete(self):
# ...
# return True or False depending on various tests.
def requires(self):
# return [ ... various dependencies ... ]
def run(self):
# do stuff with self.id
# ...
with self.output().open('w') as f:
f.write('OK')
def output(self):
return '... something ...'
class AuxiliaryTask(luigi.Task):
def requires(self):
# return [ ... various dependencies ... ]
def run(self):
ids = []
# Append multiple integer ID's to the `ids` list.
# Calculate each ID depending upon the values
# passed to this task via its parameters. Then ...
with self.output().open('w') as f:
f.write(json.dumps(ids))
def output(self):
return '... something else ...'
class MainTask(luigi.Task):
# Parameters are expected to be supplied on the command line.
param1 = luigi.parameter.IntParameter()
param2 = luigi.parameter.BoolParameter()
# ... etc. ...
def requires(self):
return [ self.clone(AuxiliaryTask) ]
def run(self):
# This method could get re-run after the yields,
# below. However, it just re-reads its input, instead
# of that input being recalculated. And in the second
# invocation, luigi's dependency mechanism will prevent
# any re-yielded-to tasks from repeating what they did
# before.
ids = []
with self.input().open('r') as f:
ids = json.dumps(f.read())
if ids:
tasks = []
# Create a list of tasks to be invoked,
# each one taking one of these ID's as a parameter.
# Then, yield to each of these tasks.
for the_id in ids:
tasks.append(OtherTask(id=the_id))
if tasks:
yield tasks
with self.output().open('w') as f:
f.write('OK')
def output(self):
return '... whatever ...'