执行嵌入在 DAG 文件中的 sql 代码

execute sql code which is embedded in a file in DAG

我遵循了上面 link 中的建议,该解决方案有效并且很好,如果我的 sql 是单行,它也有效。但是如果 SQL 代码很大并将它放在一个文件中并在函数中引用该文件则失败。

def MyChequer(**kwargs):
big_query_count = bigquery_operator.BigQueryOperator(
    task_id='my_bq_query',
    sql='/dags/sqls/invalidTable.sql'
)

然后我收到错误消息:BigQuery 作业失败。最终错误是:{'reason': 'invalidQuery', 'location': 'query', 'message': 'Syntax error: Unexpected identifier "dags" at [1:1]'}

通常我按以下方式使用,下面的方法有效

BigQueryOperator(
        task_id='invalidXXX',
        use_legacy_sql=False,
        sql='/dags/sqls/invalid_v1.sql',
        destination_dataset_table=targetTable,
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE',
        dag=dag
    )
   dag = DAG('invalidXXX', 
    default_args=default_args, 
    description='', 
    schedule_interval="0 5 * * *",
    catchup=False,
    template_searchpath=['/home/airflow/stgAirflow/']
   )

似乎错误来自尝试将此字符串 '/dags/sqls/invalid_v1.sql' 执行为 sql...这是无效的。

如果您想将 sql 保存在单独的文件中,您可以在那里读入文件内容吗?似乎 sql arg 需要一个实际的 sql 语句。

好的,我解决了这个问题。这意味着,当执行 dag 时,将使用并执行文件中的 sql 代码。不确定它是否是优化的解决方案。欢迎提出更多建议。

//define 
class SQLTemplatedPythonOperator(PythonOperator):
    template_ext = ('.sql',)

//modify function
def loadCSV(**kwargs):
    print("inside loadCSV")
    query = kwargs['templates_dict']['query']
    big_query_count = bigquery_operator.BigQueryOperator(
        task_id='my_bq_query',
        sql=query,

//dag - task
SQLTemplatedPythonOperator(
    task_id='invalidBBDToCSV',
    templates_dict={'query': 'invalidBBD.sql'},
    provide_context=True,
    python_callable=loadCSV,
    dag=dag,
//dag
dag = DAG('invalidBBDLoad', 
    default_args=default_args, 
    description='DAG data', 
    schedule_interval="0 11 * * *",
    catchup=False,
    template_searchpath=['/home/stgairflow/dags/sqls'], 
    user_defined_macros={'myProjectId': myProjectId,}
)