在 Luigi Orchestrator 任务的 运行 函数中使用 luigi.DateParameter
use luigi.DateParameter in a run function of a Luigi Orchestrator Task
我有以下 python 使用 Luigi Orchestrator 的代码。
class AggregateArtists(luigi.Task):
date = luigi.DateParameter(default=date.today() - timedelta(days=1))
def requires(self):
return []
def run(self):
...
我想在 run()
函数中使用日期参数。问题是我不知道它是什么类型。在文档中,这个参数好像是一个datetime.date
,所以我应该可以使用self.date.strftime()
这个方法。但此方法不适用于 DateParameters
.
我的问题是:
如何在 运行 函数中使用代码的可变日期?它是什么类型的?一个字符串,一个 datetime.date 或者其他什么?
在某些时候,我需要将这个日期转换为 YYYYMMDD 形式的字符串,我该怎么做?
您的代码不完整,但我猜其余的代码如下所示。你一定在某个地方出错了,因为它有效:DateParameter
returns 一个 python 日期的值。参见 luigi source code for details。
我的tasks/foo.py
:
from datetime import date, timedelta
import luigi
class AggregateArtists(luigi.Task):
date = luigi.DateParameter(default=date.today() - timedelta(days=1))
def output(self):
return luigi.LocalTarget("/tmp/foobar.txt")
def run(self):
with self.output().open('w') as out_file:
out_file.write(self.date.strftime("%Y%m%d") + "\n")
if __name__ == "__main__":
luigi.run()
运行任务:
$ python tasks/foo.py AggregateArtists --local-scheduler
DEBUG: Checking if AggregateArtists(date=2015-12-03) is complete
INFO: Scheduled AggregateArtists(date=2015-12-03) (PENDING)
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21831] Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) running AggregateArtists(date=2015-12-03)
INFO: [pid 21831] Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) done AggregateArtists(date=2015-12-03)
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) was stopped. Shutting down Keep-Alive thread
打印输出文件的内容:
$ cat /tmp/foobar.txt
20151203
我有以下 python 使用 Luigi Orchestrator 的代码。
class AggregateArtists(luigi.Task):
date = luigi.DateParameter(default=date.today() - timedelta(days=1))
def requires(self):
return []
def run(self):
...
我想在 run()
函数中使用日期参数。问题是我不知道它是什么类型。在文档中,这个参数好像是一个datetime.date
,所以我应该可以使用self.date.strftime()
这个方法。但此方法不适用于 DateParameters
.
我的问题是:
如何在 运行 函数中使用代码的可变日期?它是什么类型的?一个字符串,一个 datetime.date 或者其他什么?
在某些时候,我需要将这个日期转换为 YYYYMMDD 形式的字符串,我该怎么做?
您的代码不完整,但我猜其余的代码如下所示。你一定在某个地方出错了,因为它有效:DateParameter
returns 一个 python 日期的值。参见 luigi source code for details。
我的tasks/foo.py
:
from datetime import date, timedelta
import luigi
class AggregateArtists(luigi.Task):
date = luigi.DateParameter(default=date.today() - timedelta(days=1))
def output(self):
return luigi.LocalTarget("/tmp/foobar.txt")
def run(self):
with self.output().open('w') as out_file:
out_file.write(self.date.strftime("%Y%m%d") + "\n")
if __name__ == "__main__":
luigi.run()
运行任务:
$ python tasks/foo.py AggregateArtists --local-scheduler
DEBUG: Checking if AggregateArtists(date=2015-12-03) is complete
INFO: Scheduled AggregateArtists(date=2015-12-03) (PENDING)
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21831] Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) running AggregateArtists(date=2015-12-03)
INFO: [pid 21831] Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) done AggregateArtists(date=2015-12-03)
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) was stopped. Shutting down Keep-Alive thread
打印输出文件的内容:
$ cat /tmp/foobar.txt
20151203