Python Luigi 任务结构
Python Luigi Task structure
class Download(Task):
date_interval = DateIntervalParameter()
def output(self):
return LocalTarget("data/user_{0}.tar.bz2".format(self.date_interval))
def run(self):
#import pdb; pdb.set_trace()
SENTENCE_URL = 'http://downloads.org/exports/user_lists.tar.bz2'
sentence_file = download(SENTENCE_URL, out=self.output().path)
class Uncompress(Task):
date_interval = DateIntervalParameter()
def output(self):
return LocalTarget("data/user_{0}.tar".format(self.date_interval))
def requires(self):
return Download(self.date_interval)
def run(self):
with open(self.output().path, 'wb') as tar_file, open(self.input().path, 'rb') as file:
decompressor = BZ2Decompressor()
#loop over each tar file in the bzip file
for data in iter(lambda : file.read(100 * 1024), b''):
tar_file.write(decompressor.decompress(data))
我的第一个任务是从互联网上下载一个文件,下一个任务是解压缩它。我要编写的下一个任务将从 tar 文件中的 CSV 文件读取并将其解析为多个文件。即 data/file_{var}、data/faile_{var2}.. 等。但我相信任务 3 需要有一个日期间隔才能传递给其他任务。
是否有解决此问题的方法或更好的方法来构建我的任务?
您可以做几件事。从文档:
http://luigi.readthedocs.io/en/stable/parameters.html
Parameters are resolved in the following order of decreasing priority:
1. Any value passed to the constructor, or task level value set on the command line (applies on an instance level)
2. Any value set on the command line (applies on a class level)
3. Any configuration option (applies on a class level)
4. Any default value provided to the parameter (applies on a class level)
在命令行中,您可以执行以下操作:
luigi Uncompress --Download-dateinverval 2017-02-03
将参数传递给层次结构中的其他任务。
class Download(Task):
date_interval = DateIntervalParameter()
def output(self):
return LocalTarget("data/user_{0}.tar.bz2".format(self.date_interval))
def run(self):
#import pdb; pdb.set_trace()
SENTENCE_URL = 'http://downloads.org/exports/user_lists.tar.bz2'
sentence_file = download(SENTENCE_URL, out=self.output().path)
class Uncompress(Task):
date_interval = DateIntervalParameter()
def output(self):
return LocalTarget("data/user_{0}.tar".format(self.date_interval))
def requires(self):
return Download(self.date_interval)
def run(self):
with open(self.output().path, 'wb') as tar_file, open(self.input().path, 'rb') as file:
decompressor = BZ2Decompressor()
#loop over each tar file in the bzip file
for data in iter(lambda : file.read(100 * 1024), b''):
tar_file.write(decompressor.decompress(data))
我的第一个任务是从互联网上下载一个文件,下一个任务是解压缩它。我要编写的下一个任务将从 tar 文件中的 CSV 文件读取并将其解析为多个文件。即 data/file_{var}、data/faile_{var2}.. 等。但我相信任务 3 需要有一个日期间隔才能传递给其他任务。
是否有解决此问题的方法或更好的方法来构建我的任务?
您可以做几件事。从文档: http://luigi.readthedocs.io/en/stable/parameters.html
Parameters are resolved in the following order of decreasing priority:
1. Any value passed to the constructor, or task level value set on the command line (applies on an instance level)
2. Any value set on the command line (applies on a class level)
3. Any configuration option (applies on a class level)
4. Any default value provided to the parameter (applies on a class level)
在命令行中,您可以执行以下操作:
luigi Uncompress --Download-dateinverval 2017-02-03
将参数传递给层次结构中的其他任务。