如何在 Airflow MySQL Operator 中使用 jinja 模板
How to use jinja template in Airflow MySQL Operator
我目前正在 运行 Airflow 的 MysQLOperator 中进行此查询。
如何使用 Jinja 模板用参数替换区域、s3 存储桶?
- 气流版本:2.0.2
- Python: 3.7
sql = """SELECT * FROM test
INTO OUTFILE S3 's3-ap-southeast-1://my-s3-bucket/my-key'
CHARACTER SET utf8
FORMAT CSV HEADER
FIELDS
TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
LINES
TERMINATED BY '\n'
OVERWRITE ON;
"""
mysql_to_s3 = MySqlOperator(
task_id="mysql_to_s3",
dag=dag,
sql=rds_sql,
mysql_conn_id=MYSQL_CONN_ID,
parameters={
"s3_bucket": "my-s3-bucket",
"s3_key_prefix": "my-key",
"region": "ap-southeast-1",
},
autocommit=False,
database="test",
)
您可以使用气流变量 - https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
Airflow jinja 模板支持 - https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html#concepts-jinja-templating
您可以使用参数将动态值传递给您的 SQL:
sql = """SELECT * FROM test
INTO OUTFILE S3 '{{ params.region }}://{{ params.s3_bucket }}/{{ params.s3_key_prefix }}'
CHARACTER SET utf8
FORMAT CSV HEADER
FIELDS
TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
LINES
TERMINATED BY '\n'
OVERWRITE ON;
"""
mysql_to_s3 = MySqlOperator(
task_id="mysql_to_s3",
dag=dag,
sql=sql,
mysql_conn_id=MYSQL_CONN_ID,
params={
"s3_bucket": "my-s3-bucket",
"s3_key_prefix": "my-key",
"region": "ap-southeast-1",
},
autocommit=False,
database="test",
)
如果值存储在 Airflow 变量中(region
、s3_bucket
、s3_key_prefix
),那么您可以从运算符中删除参数字典并更改 sql至:
INTO OUTFILE S3 '{{ var.value.region }}://{{ var.value.s3_bucket }}/{{ var.value.s3_key_prefix }}'
在这两个选项中,Airflow 将模板化 sql 字符串,并在执行运算符时用值替换占位符。您可以在任务渲染选项卡中看到实际值。
我目前正在 运行 Airflow 的 MysQLOperator 中进行此查询。 如何使用 Jinja 模板用参数替换区域、s3 存储桶?
- 气流版本:2.0.2
- Python: 3.7
sql = """SELECT * FROM test
INTO OUTFILE S3 's3-ap-southeast-1://my-s3-bucket/my-key'
CHARACTER SET utf8
FORMAT CSV HEADER
FIELDS
TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
LINES
TERMINATED BY '\n'
OVERWRITE ON;
"""
mysql_to_s3 = MySqlOperator(
task_id="mysql_to_s3",
dag=dag,
sql=rds_sql,
mysql_conn_id=MYSQL_CONN_ID,
parameters={
"s3_bucket": "my-s3-bucket",
"s3_key_prefix": "my-key",
"region": "ap-southeast-1",
},
autocommit=False,
database="test",
)
您可以使用气流变量 - https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
Airflow jinja 模板支持 - https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html#concepts-jinja-templating
您可以使用参数将动态值传递给您的 SQL:
sql = """SELECT * FROM test
INTO OUTFILE S3 '{{ params.region }}://{{ params.s3_bucket }}/{{ params.s3_key_prefix }}'
CHARACTER SET utf8
FORMAT CSV HEADER
FIELDS
TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
LINES
TERMINATED BY '\n'
OVERWRITE ON;
"""
mysql_to_s3 = MySqlOperator(
task_id="mysql_to_s3",
dag=dag,
sql=sql,
mysql_conn_id=MYSQL_CONN_ID,
params={
"s3_bucket": "my-s3-bucket",
"s3_key_prefix": "my-key",
"region": "ap-southeast-1",
},
autocommit=False,
database="test",
)
如果值存储在 Airflow 变量中(region
、s3_bucket
、s3_key_prefix
),那么您可以从运算符中删除参数字典并更改 sql至:
INTO OUTFILE S3 '{{ var.value.region }}://{{ var.value.s3_bucket }}/{{ var.value.s3_key_prefix }}'
在这两个选项中,Airflow 将模板化 sql 字符串,并在执行运算符时用值替换占位符。您可以在任务渲染选项卡中看到实际值。