重复使用类似的 luigi 任务

re-use similar luigi tasks

我有一个 luigi 任务,它读取一个 .sql 文件并输出到 BigQuery。

我的问题是有什么方法可以用不同的 .sql 文件重用同一个任务,而不必复制整个 luigi 任务,即我想创建模板 luigi 任务的实例。

class run_sql(luigi.task):
    sql_file = 'path/to/sql/file'  # This is the only bit of code that changes 
    def complete(self):
        ...
    def requires(self):
        ...
    def run(self):
        ...

只需使用a parameter指定文件路径即可。像这样:

class RunSql(luigi.task):

    sql_file = luigi.Parameter()

    def complete(self):
        ...

    def requires(self):
        ...

    def run(self):
        ...

为了访问参数的值,只需在您的代码中使用 self.sql_file

之后你可以这样运行你的任务:

luigi RunSql --sql-file path/to/file.sql

基于@matagus 的回答,您还可以使用 complete()requires() subclass RunSql 来定义 sql 文件, 以及父 class.

run() 方法
class RunSqlFile(RunSql):
    sql_file = '/path/to/file.sql`

或者您可以使用 @property 装饰器来引用 RunSql class 的属性。我经常这样做是为了在父 class 中设置目录或其他配置数据,然后在子 classes.

中引用它们
class RunSql(luigi.Task):
    sql_file = luigi.Parameter()

    def get_file(self, name):
        default_dir = '/path/to/sql/dir'
        return os.path.join(default_dir, name)

   def requires(self):
        ...


class RunSqlFile(RunTask):

    @property
    def sql_file(self):
        return self.get_file("query.sql")

这就好像您用 --sql-file /path/to/sql/dir/query.sql

实例化了 class