执行嵌入在 DAG 文件中的 sql 代码
execute sql code which is embedded in a file in DAG
我遵循了上面 link 中的建议,该解决方案有效并且很好,如果我的 sql 是单行,它也有效。但是如果 SQL 代码很大并将它放在一个文件中并在函数中引用该文件则失败。
def MyChequer(**kwargs):
big_query_count = bigquery_operator.BigQueryOperator(
task_id='my_bq_query',
sql='/dags/sqls/invalidTable.sql'
)
然后我收到错误消息:BigQuery 作业失败。最终错误是:{'reason': 'invalidQuery', 'location': 'query', 'message': 'Syntax error: Unexpected identifier "dags" at [1:1]'}
通常我按以下方式使用,下面的方法有效
BigQueryOperator(
task_id='invalidXXX',
use_legacy_sql=False,
sql='/dags/sqls/invalid_v1.sql',
destination_dataset_table=targetTable,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
dag=dag
)
dag = DAG('invalidXXX',
default_args=default_args,
description='',
schedule_interval="0 5 * * *",
catchup=False,
template_searchpath=['/home/airflow/stgAirflow/']
)
似乎错误来自尝试将此字符串 '/dags/sqls/invalid_v1.sql'
执行为 sql...这是无效的。
如果您想将 sql 保存在单独的文件中,您可以在那里读入文件内容吗?似乎 sql arg 需要一个实际的 sql 语句。
好的,我解决了这个问题。这意味着,当执行 dag 时,将使用并执行文件中的 sql 代码。不确定它是否是优化的解决方案。欢迎提出更多建议。
//define
class SQLTemplatedPythonOperator(PythonOperator):
template_ext = ('.sql',)
//modify function
def loadCSV(**kwargs):
print("inside loadCSV")
query = kwargs['templates_dict']['query']
big_query_count = bigquery_operator.BigQueryOperator(
task_id='my_bq_query',
sql=query,
//dag - task
SQLTemplatedPythonOperator(
task_id='invalidBBDToCSV',
templates_dict={'query': 'invalidBBD.sql'},
provide_context=True,
python_callable=loadCSV,
dag=dag,
//dag
dag = DAG('invalidBBDLoad',
default_args=default_args,
description='DAG data',
schedule_interval="0 11 * * *",
catchup=False,
template_searchpath=['/home/stgairflow/dags/sqls'],
user_defined_macros={'myProjectId': myProjectId,}
)
我遵循了上面 link 中的建议,该解决方案有效并且很好,如果我的 sql 是单行,它也有效。但是如果 SQL 代码很大并将它放在一个文件中并在函数中引用该文件则失败。
def MyChequer(**kwargs):
big_query_count = bigquery_operator.BigQueryOperator(
task_id='my_bq_query',
sql='/dags/sqls/invalidTable.sql'
)
然后我收到错误消息:BigQuery 作业失败。最终错误是:{'reason': 'invalidQuery', 'location': 'query', 'message': 'Syntax error: Unexpected identifier "dags" at [1:1]'}
通常我按以下方式使用,下面的方法有效
BigQueryOperator(
task_id='invalidXXX',
use_legacy_sql=False,
sql='/dags/sqls/invalid_v1.sql',
destination_dataset_table=targetTable,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
dag=dag
)
dag = DAG('invalidXXX',
default_args=default_args,
description='',
schedule_interval="0 5 * * *",
catchup=False,
template_searchpath=['/home/airflow/stgAirflow/']
)
似乎错误来自尝试将此字符串 '/dags/sqls/invalid_v1.sql'
执行为 sql...这是无效的。
如果您想将 sql 保存在单独的文件中,您可以在那里读入文件内容吗?似乎 sql arg 需要一个实际的 sql 语句。
好的,我解决了这个问题。这意味着,当执行 dag 时,将使用并执行文件中的 sql 代码。不确定它是否是优化的解决方案。欢迎提出更多建议。
//define
class SQLTemplatedPythonOperator(PythonOperator):
template_ext = ('.sql',)
//modify function
def loadCSV(**kwargs):
print("inside loadCSV")
query = kwargs['templates_dict']['query']
big_query_count = bigquery_operator.BigQueryOperator(
task_id='my_bq_query',
sql=query,
//dag - task
SQLTemplatedPythonOperator(
task_id='invalidBBDToCSV',
templates_dict={'query': 'invalidBBD.sql'},
provide_context=True,
python_callable=loadCSV,
dag=dag,
//dag
dag = DAG('invalidBBDLoad',
default_args=default_args,
description='DAG data',
schedule_interval="0 11 * * *",
catchup=False,
template_searchpath=['/home/stgairflow/dags/sqls'],
user_defined_macros={'myProjectId': myProjectId,}
)