Airflow MySQL 运算符尝试将脚本路径字符串执行为 SQL,而不是使用模板

Airflow MySQL operator trying to execute script path string as SQL, rather than using template

我有一个关于 Airflow 的令人困惑的问题,我不明白。

我在 DML/analytics/my_script.sql 有一个 SQL 脚本文件夹。 MySQL 运算符在正常情况下工作得很好,但当我尝试从 Python 运算符调用它时却没有,如下所示。这是因为需要从另一个任务传入 XCOM 值:

def insert_func(**kwargs):
    run_update = MySqlOperator(
        sql='DML/analytics/my_script.sql',
        task_id='insert_func',
        mysql_conn_id="bi_mysql",
        params={
            "table_name": table_name, 
            'ts': kwargs['task_instance'].xcom_pull(key='return_value',task_ids='get_existing_data')
        },
    )
    run_update.execute(context=kwargs['task_instance'])

with DAG("my_dag", **dag_params) as dag:
    with TaskGroup(group_id='insert') as insert:
        get_existing_data = PythonOperator(
            task_id='get_existing_data',
            python_callable=MySQLGetRecord,
            op_kwargs={
                'target_db_conn_id':'bi_mysql', 
                'target_db':'analytics', 
                'sql': f'SELECT invoice_date FROM analytics.{table_name} ORDER BY 1 DESC'
            }
        ),
        insert = PythonOperator(
            task_id='insert',
            python_callable=insert_func
        )
        get_existing_data >> insert_func

我得到的错误是:MySQLdb._exceptions.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'DML/analytics/my_script.sql' at line 1")

很明显,它试图 运行 在 sql 参数中传递的文字字符串,而不是将其用作文件位置。为什么会这样?同样,如果我将 run_update 任务移动到 my_dag with 子句,这会起作用,但我需要这样做才能从 get_existing_data 中获取 XCOM 值,正确。 ..?

当您正常使用运算符时(例如,由 Airflow 使用),Airflow 负责整个任务生命周期。这意味着 Airflow 处理模板、执行 pre_execute()、执行 execute()、执行 on_faulure/retries 等...

您所做的是在包含 MySqlOperator 的运算符 -> PythonOperator 中使用运算符。在这种情况下,内部运算符 (MySqlOperator) 只是一个常规的 Python class。虽然它被称为 Operator,但它并不是“真正的”Operator。 您没有像预期的那样享受任何生命周期步骤。

您可能已经意识到,在您自己的示例中,您专门触发了 execute():

run_update.execute(context=kwargs['task_instance'])

请注意,您不需要为 PythonOperaor 执行此操作。

在代码库中可以看到 Airflow 调用了 render_templates before it invokes pre_execute() and before it invokes execute().

这意味着如果您希望对 MySqlOperator 进行模板化,您需要在调用 execute()

之前调用执行模板化的函数

也就是说 - 我强烈建议你 - 不要在运算符内部使用运算符。

从你的代码中我看不出为什么你不能在没有 PythonOperaor 的情况下直接使用 MySqlOperator 的原因但是应该有一个理由来处理它的正确方法是创建一个CustomMySqlOperator 处理您寻求的逻辑。通过这样做,您在使用 .sql 文件时不会遇到问题。