在 Luigi Orchestrator 任务的 运行 函数中使用 luigi.DateParameter

use luigi.DateParameter in a run function of a Luigi Orchestrator Task

我有以下 python 使用 Luigi Orchestrator 的代码。

class AggregateArtists(luigi.Task):
    date = luigi.DateParameter(default=date.today() - timedelta(days=1))

    def requires(self):
        return []

    def run(self):
        ...

我想在 run() 函数中使用日期参数。问题是我不知道它是什么类型。在文档中,这个参数好像是一个datetime.date,所以我应该可以使用self.date.strftime()这个方法。但此方法不适用于 DateParameters.

我的问题是:

您的代码不完整,但我猜其余的代码如下所示。你一定在某个地方出错了,因为它有效:DateParameter returns 一个 python 日期的值。参见 luigi source code for details

我的tasks/foo.py:

from datetime import date, timedelta
import luigi


class AggregateArtists(luigi.Task):
    date = luigi.DateParameter(default=date.today() - timedelta(days=1))

    def output(self):
        return luigi.LocalTarget("/tmp/foobar.txt")

    def run(self):
        with self.output().open('w') as out_file:
            out_file.write(self.date.strftime("%Y%m%d") + "\n")


if __name__ == "__main__":
    luigi.run()

运行任务:

$ python tasks/foo.py AggregateArtists --local-scheduler

DEBUG: Checking if AggregateArtists(date=2015-12-03) is complete
INFO: Scheduled AggregateArtists(date=2015-12-03) (PENDING)
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21831] Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) running   AggregateArtists(date=2015-12-03)
INFO: [pid 21831] Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) done      AggregateArtists(date=2015-12-03)
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) was stopped. Shutting down Keep-Alive thread

打印输出文件的内容:

$ cat /tmp/foobar.txt 
20151203