气流:将 {{ ds }} 作为参数传递给 PostgresOperator

Airflow: pass {{ ds }} as param to PostgresOperator

我想将执行日期用作我的 sql 文件的参数:

我试过了

dt = '{{ ds }}'

s3_to_redshift = PostgresOperator(
    task_id='s3_to_redshift',
    postgres_conn_id='redshift',
    sql='s3_to_redshift.sql',
    params={'file': dt},
    dag=dag
)

但是没用。

dt = '{{ ds }}'

不起作用,因为 Jinja(气流中使用的模板引擎)不处理整个 Dag 定义文件。

对于每个 Operator 都有 Jinja 将处理的字段,它们是运算符本身定义的一部分。

在这种情况下,如果您像这样扩展 PostgresOperator,则可以使 params 字段(实际上称为 parameters,请务必更改它)模板化:

class MyPostgresOperator(PostgresOperator):
    template_fields = ('sql','parameters')

现在你应该可以做到:

s3_to_redshift = MyPostgresOperator(
    task_id='s3_to_redshift',
    postgres_conn_id='redshift',
    sql='s3_to_redshift.sql',
    parameters={'file': '{{ ds }}'},
    dag=dag
)

PostgresOperator / JDBCOperator 继承自 BaseOperator。
BaseOperator 的输入参数之一是 params: self.params = params or {} # Available in templates!

因此,您应该能够在不创建新文件的情况下使用它 class:
(即使参数未包含在 template_fields 中) t1 = JdbcOperator( task_id='copy', sql='copy.sql', jdbc_conn_id='connection_name', params={'schema_name':'public'}, dag=dag )

SQL 语句 (copy.sql) 可能如下所示: copy {{ params.schema_name }}.table_name from 's3://.../table_name.csv' iam_role 'arn:aws:iam::<acc_num>:role/<role_name>' csv IGNOREHEADER 1

注:

copy.sql 与 DAG 所在的位置相同。
或者
您可以在 "default_args"
中定义 "template_searchpath" 变量,并指定模板文件所在文件夹的绝对路径。
例如:'template_searchpath': '/home/user/airflow/templates/'