重复使用类似的 luigi 任务
re-use similar luigi tasks
我有一个 luigi 任务,它读取一个 .sql 文件并输出到 BigQuery。
我的问题是有什么方法可以用不同的 .sql 文件重用同一个任务,而不必复制整个 luigi 任务,即我想创建模板 luigi 任务的实例。
class run_sql(luigi.task):
sql_file = 'path/to/sql/file' # This is the only bit of code that changes
def complete(self):
...
def requires(self):
...
def run(self):
...
只需使用a parameter指定文件路径即可。像这样:
class RunSql(luigi.task):
sql_file = luigi.Parameter()
def complete(self):
...
def requires(self):
...
def run(self):
...
为了访问参数的值,只需在您的代码中使用 self.sql_file
。
之后你可以这样运行你的任务:
luigi RunSql --sql-file path/to/file.sql
基于@matagus 的回答,您还可以使用 complete()
、requires()
subclass RunSql
来定义 sql 文件, 以及父 class.
的 run()
方法
class RunSqlFile(RunSql):
sql_file = '/path/to/file.sql`
或者您可以使用 @property
装饰器来引用 RunSql
class 的属性。我经常这样做是为了在父 class 中设置目录或其他配置数据,然后在子 classes.
中引用它们
class RunSql(luigi.Task):
sql_file = luigi.Parameter()
def get_file(self, name):
default_dir = '/path/to/sql/dir'
return os.path.join(default_dir, name)
def requires(self):
...
class RunSqlFile(RunTask):
@property
def sql_file(self):
return self.get_file("query.sql")
这就好像您用 --sql-file /path/to/sql/dir/query.sql
实例化了 class
我有一个 luigi 任务,它读取一个 .sql 文件并输出到 BigQuery。
我的问题是有什么方法可以用不同的 .sql 文件重用同一个任务,而不必复制整个 luigi 任务,即我想创建模板 luigi 任务的实例。
class run_sql(luigi.task):
sql_file = 'path/to/sql/file' # This is the only bit of code that changes
def complete(self):
...
def requires(self):
...
def run(self):
...
只需使用a parameter指定文件路径即可。像这样:
class RunSql(luigi.task):
sql_file = luigi.Parameter()
def complete(self):
...
def requires(self):
...
def run(self):
...
为了访问参数的值,只需在您的代码中使用 self.sql_file
。
之后你可以这样运行你的任务:
luigi RunSql --sql-file path/to/file.sql
基于@matagus 的回答,您还可以使用 complete()
、requires()
subclass RunSql
来定义 sql 文件, 以及父 class.
run()
方法
class RunSqlFile(RunSql):
sql_file = '/path/to/file.sql`
或者您可以使用 @property
装饰器来引用 RunSql
class 的属性。我经常这样做是为了在父 class 中设置目录或其他配置数据,然后在子 classes.
class RunSql(luigi.Task):
sql_file = luigi.Parameter()
def get_file(self, name):
default_dir = '/path/to/sql/dir'
return os.path.join(default_dir, name)
def requires(self):
...
class RunSqlFile(RunTask):
@property
def sql_file(self):
return self.get_file("query.sql")
这就好像您用 --sql-file /path/to/sql/dir/query.sql