Luigi:Rangehourly 示例

Luigi : Rangehourly Examples

是否有可用于 RangeHourly 条款的示例(或类似的示例,如 RangeDaily)。我一直在尝试用它来重复执行任务。但我总是会收到如下错误:

调试:检查 RangeHourly(of=FinalTask​​, of_params={}, reverse=False, task_limit=50, now=None, param_name=None,start=2017-06-28T15,stop=None,hours_back=0,hours_forward=0)完成 调试:空范围。预期没有 FinalTask​​ 实例

下面是任务的定义:

class FinalTask (luigi.Task):
    start = luigi.DateHourParameter()
    def requires(self):
            return CleanupTask()
    def run(self):
            cmd='echo "Workflow Completed"'
            args=shlex.split(cmd)
            exc=subprocess.Popen(args,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
            stdout,stderr=exc.communicate()
            self.output().open('w').close()
    def output(self):
            return luigi.LocalTarget('/var/flags/FinalTask_success_%s.csv' %start)

我是否遗漏了什么导致此问题的原因?

python tasks.py RangeHourlyBase --of FinalTask --start 2017-07-31T00 --stop 2017-07-31T23 --local-scheduler --workers 4

注意:

  • 您必须将 tasks.py 替换为定义 FinalTask 的文件名。
  • --local-scheduler 仅用于 运行 您在本地的任务。不要在生产中使用它。
  • 最后一行有错误:start未定义,应该是:

    return luigi.LocalTarget('/var/flags/FinalTask_success_%s.csv' % self.start)