如何在 Airflow MySQL Operator 中使用 jinja 模板

How to use jinja template in Airflow MySQL Operator

我目前正在 运行 Airflow 的 MysQLOperator 中进行此查询。 如何使用 Jinja 模板用参数替换区域、s3 存储桶?

sql = """SELECT * FROM test
INTO OUTFILE S3 's3-ap-southeast-1://my-s3-bucket/my-key'
CHARACTER SET utf8
FORMAT CSV HEADER
FIELDS
  TERMINATED BY ','
  OPTIONALLY ENCLOSED BY '"'
LINES
  TERMINATED BY '\n'
OVERWRITE ON;
"""
mysql_to_s3 = MySqlOperator(
  task_id="mysql_to_s3",
  dag=dag,
  sql=rds_sql,
  mysql_conn_id=MYSQL_CONN_ID,
  parameters={
    "s3_bucket": "my-s3-bucket",
    "s3_key_prefix": "my-key",
    "region": "ap-southeast-1",
  },
  autocommit=False,
  database="test",
)

您可以使用气流变量 - https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html

Airflow jinja 模板支持 - https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html#concepts-jinja-templating

您可以使用参数将动态值传递给您的 SQL:

sql = """SELECT * FROM test
INTO OUTFILE S3 '{{ params.region }}://{{ params.s3_bucket }}/{{ params.s3_key_prefix }}'
CHARACTER SET utf8
FORMAT CSV HEADER
FIELDS
  TERMINATED BY ','
  OPTIONALLY ENCLOSED BY '"'
LINES
  TERMINATED BY '\n'
OVERWRITE ON;
"""

mysql_to_s3 = MySqlOperator(
  task_id="mysql_to_s3",
  dag=dag,
  sql=sql,
  mysql_conn_id=MYSQL_CONN_ID,
  params={
    "s3_bucket": "my-s3-bucket",
    "s3_key_prefix": "my-key",
    "region": "ap-southeast-1",
  },
  autocommit=False,
  database="test",
)

如果值存储在 Airflow 变量中(regions3_buckets3_key_prefix),那么您可以从运算符中删除参数字典并更改 sql至:

INTO OUTFILE S3 '{{ var.value.region }}://{{ var.value.s3_bucket }}/{{ var.value.s3_key_prefix }}'

在这两个选项中,Airflow 将模板化 sql 字符串,并在执行运算符时用值替换占位符。您可以在任务渲染选项卡中看到实际值。