如何在 Airflow 的 MySqlOperator 中渲染带有参数的 .sql 文件?

How to render a .sql file with parameters in MySqlOperator in Airflow?

我需要帮助将参数(从上一个任务推送的 xcom)传递给 .sql 文件中的 SQL 查询。但是,我无法使用“参数”选项执行此操作,即使此选项能够呈现先前任务中的 xcom 值。让我知道我做错了什么。

谢谢:)

start = EmptyOperator(
            task_id="start",
    )

fetch_cust_id = PythonOperator(
    task_id = "fetch",
    python_callable = lambda: 'C001',
)

update_orders = MySqlOperator(
    task_id="update",
    mysql_conn_id="mysql_default",
    database="my_db",
    sql="/update.sql",
    parameters={
        "custid": "{{ ti.xcom_pull(task_ids='fetch') }}"
    }
)

start >> fetch_cust_id >> update_orders

SQL 文件(update.sql):

UPDATE orders
SET placed = 'yes'
WHERE
custid = {{ custid }}
;

:(

parameters 用于将“变量”传递给 SqlAlchemy 引擎。在这种情况下,渲染不是在 Airflow 引擎中完成的。如果你想使用它,你需要使用 SqlAlchemy 语法。 示例:

sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},

但是在你的情况下你想要模板 xcom 所以根本没有理由使用 parameters。您希望渲染由 Airflow 完成。

您可以直接在 sql 中设置它,因为 sql 是一个模板字段:

UPDATE orders
SET placed = 'yes'
WHERE custid = "{{ ti.xcom_pull(task_ids='fetch') }}";