如何在 Airflow 的 MySqlOperator 中渲染带有参数的 .sql 文件?
How to render a .sql file with parameters in MySqlOperator in Airflow?
我需要帮助将参数(从上一个任务推送的 xcom)传递给 .sql 文件中的 SQL 查询。但是,我无法使用“参数”选项执行此操作,即使此选项能够呈现先前任务中的 xcom 值。让我知道我做错了什么。
谢谢:)
start = EmptyOperator(
task_id="start",
)
fetch_cust_id = PythonOperator(
task_id = "fetch",
python_callable = lambda: 'C001',
)
update_orders = MySqlOperator(
task_id="update",
mysql_conn_id="mysql_default",
database="my_db",
sql="/update.sql",
parameters={
"custid": "{{ ti.xcom_pull(task_ids='fetch') }}"
}
)
start >> fetch_cust_id >> update_orders
SQL 文件(update.sql):
UPDATE orders
SET placed = 'yes'
WHERE
custid = {{ custid }}
;
:(
parameters
用于将“变量”传递给 SqlAlchemy 引擎。在这种情况下,渲染不是在 Airflow 引擎中完成的。如果你想使用它,你需要使用 SqlAlchemy 语法。
示例:
sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
但是在你的情况下你想要模板 xcom 所以根本没有理由使用 parameters
。您希望渲染由 Airflow 完成。
您可以直接在 sql
中设置它,因为 sql 是一个模板字段:
UPDATE orders
SET placed = 'yes'
WHERE custid = "{{ ti.xcom_pull(task_ids='fetch') }}";
我需要帮助将参数(从上一个任务推送的 xcom)传递给 .sql 文件中的 SQL 查询。但是,我无法使用“参数”选项执行此操作,即使此选项能够呈现先前任务中的 xcom 值。让我知道我做错了什么。
谢谢:)
start = EmptyOperator(
task_id="start",
)
fetch_cust_id = PythonOperator(
task_id = "fetch",
python_callable = lambda: 'C001',
)
update_orders = MySqlOperator(
task_id="update",
mysql_conn_id="mysql_default",
database="my_db",
sql="/update.sql",
parameters={
"custid": "{{ ti.xcom_pull(task_ids='fetch') }}"
}
)
start >> fetch_cust_id >> update_orders
SQL 文件(update.sql):
UPDATE orders
SET placed = 'yes'
WHERE
custid = {{ custid }}
;
:(
parameters
用于将“变量”传递给 SqlAlchemy 引擎。在这种情况下,渲染不是在 Airflow 引擎中完成的。如果你想使用它,你需要使用 SqlAlchemy 语法。
示例:
sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
但是在你的情况下你想要模板 xcom 所以根本没有理由使用 parameters
。您希望渲染由 Airflow 完成。
您可以直接在 sql
中设置它,因为 sql 是一个模板字段:
UPDATE orders
SET placed = 'yes'
WHERE custid = "{{ ti.xcom_pull(task_ids='fetch') }}";