Airflow MySQL 运算符尝试将脚本路径字符串执行为 SQL,而不是使用模板
Airflow MySQL operator trying to execute script path string as SQL, rather than using template
我有一个关于 Airflow 的令人困惑的问题,我不明白。
我在 DML/analytics/my_script.sql 有一个 SQL 脚本文件夹。 MySQL 运算符在正常情况下工作得很好,但当我尝试从 Python 运算符调用它时却没有,如下所示。这是因为需要从另一个任务传入 XCOM 值:
def insert_func(**kwargs):
run_update = MySqlOperator(
sql='DML/analytics/my_script.sql',
task_id='insert_func',
mysql_conn_id="bi_mysql",
params={
"table_name": table_name,
'ts': kwargs['task_instance'].xcom_pull(key='return_value',task_ids='get_existing_data')
},
)
run_update.execute(context=kwargs['task_instance'])
with DAG("my_dag", **dag_params) as dag:
with TaskGroup(group_id='insert') as insert:
get_existing_data = PythonOperator(
task_id='get_existing_data',
python_callable=MySQLGetRecord,
op_kwargs={
'target_db_conn_id':'bi_mysql',
'target_db':'analytics',
'sql': f'SELECT invoice_date FROM analytics.{table_name} ORDER BY 1 DESC'
}
),
insert = PythonOperator(
task_id='insert',
python_callable=insert_func
)
get_existing_data >> insert_func
我得到的错误是:MySQLdb._exceptions.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'DML/analytics/my_script.sql' at line 1")
很明显,它试图 运行 在 sql
参数中传递的文字字符串,而不是将其用作文件位置。为什么会这样?同样,如果我将 run_update
任务移动到 my_dag with
子句,这会起作用,但我需要这样做才能从 get_existing_data
中获取 XCOM 值,正确。 ..?
当您正常使用运算符时(例如,由 Airflow 使用),Airflow 负责整个任务生命周期。这意味着 Airflow 处理模板、执行 pre_execute()
、执行 execute()
、执行 on_faulure/retries 等...
您所做的是在包含 MySqlOperator
的运算符 -> PythonOperator
中使用运算符。在这种情况下,内部运算符 (MySqlOperator
) 只是一个常规的 Python class。虽然它被称为 Operator,但它并不是“真正的”Operator。
您没有像预期的那样享受任何生命周期步骤。
您可能已经意识到,在您自己的示例中,您专门触发了 execute()
:
run_update.execute(context=kwargs['task_instance'])
请注意,您不需要为 PythonOperaor
执行此操作。
在代码库中可以看到 Airflow 调用了 render_templates
before it invokes pre_execute()
and before it invokes execute().
这意味着如果您希望对 MySqlOperator
进行模板化,您需要在调用 execute()
之前调用执行模板化的函数
也就是说 - 我强烈建议你 - 不要在运算符内部使用运算符。
从你的代码中我看不出为什么你不能在没有 PythonOperaor
的情况下直接使用 MySqlOperator
的原因但是应该有一个理由来处理它的正确方法是创建一个CustomMySqlOperator
处理您寻求的逻辑。通过这样做,您在使用 .sql
文件时不会遇到问题。
我有一个关于 Airflow 的令人困惑的问题,我不明白。
我在 DML/analytics/my_script.sql 有一个 SQL 脚本文件夹。 MySQL 运算符在正常情况下工作得很好,但当我尝试从 Python 运算符调用它时却没有,如下所示。这是因为需要从另一个任务传入 XCOM 值:
def insert_func(**kwargs):
run_update = MySqlOperator(
sql='DML/analytics/my_script.sql',
task_id='insert_func',
mysql_conn_id="bi_mysql",
params={
"table_name": table_name,
'ts': kwargs['task_instance'].xcom_pull(key='return_value',task_ids='get_existing_data')
},
)
run_update.execute(context=kwargs['task_instance'])
with DAG("my_dag", **dag_params) as dag:
with TaskGroup(group_id='insert') as insert:
get_existing_data = PythonOperator(
task_id='get_existing_data',
python_callable=MySQLGetRecord,
op_kwargs={
'target_db_conn_id':'bi_mysql',
'target_db':'analytics',
'sql': f'SELECT invoice_date FROM analytics.{table_name} ORDER BY 1 DESC'
}
),
insert = PythonOperator(
task_id='insert',
python_callable=insert_func
)
get_existing_data >> insert_func
我得到的错误是:MySQLdb._exceptions.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'DML/analytics/my_script.sql' at line 1")
很明显,它试图 运行 在 sql
参数中传递的文字字符串,而不是将其用作文件位置。为什么会这样?同样,如果我将 run_update
任务移动到 my_dag with
子句,这会起作用,但我需要这样做才能从 get_existing_data
中获取 XCOM 值,正确。 ..?
当您正常使用运算符时(例如,由 Airflow 使用),Airflow 负责整个任务生命周期。这意味着 Airflow 处理模板、执行 pre_execute()
、执行 execute()
、执行 on_faulure/retries 等...
您所做的是在包含 MySqlOperator
的运算符 -> PythonOperator
中使用运算符。在这种情况下,内部运算符 (MySqlOperator
) 只是一个常规的 Python class。虽然它被称为 Operator,但它并不是“真正的”Operator。
您没有像预期的那样享受任何生命周期步骤。
您可能已经意识到,在您自己的示例中,您专门触发了 execute()
:
run_update.execute(context=kwargs['task_instance'])
请注意,您不需要为 PythonOperaor
执行此操作。
在代码库中可以看到 Airflow 调用了 render_templates
before it invokes pre_execute()
and before it invokes execute().
这意味着如果您希望对 MySqlOperator
进行模板化,您需要在调用 execute()
也就是说 - 我强烈建议你 - 不要在运算符内部使用运算符。
从你的代码中我看不出为什么你不能在没有 PythonOperaor
的情况下直接使用 MySqlOperator
的原因但是应该有一个理由来处理它的正确方法是创建一个CustomMySqlOperator
处理您寻求的逻辑。通过这样做,您在使用 .sql
文件时不会遇到问题。