Airflow 2 - BigQueryInsertJobOperator - 如何解决 airflow.exceptions.AirflowException:需要参数 ['configuration']
Airflow 2 - BigQueryInsertJobOperator - how to resolve airflow.exceptions.AirflowException: Argument ['configuration'] is required
我在 Airflow 2.1.4 中使用 BigQueryInsertJobOperator
时遇到以下错误:
airflow.exceptions.AirflowException: Argument ['configuration'] is required
如何正确定义配置?
代码:
my_bq_task = BigQueryInsertJobOperator(
dag=dag,
task_id="my_bq_task",
gcp_conn_id="google_cloud_default",
configuration={
"query": "{% include '/sql/load_crm_interactions.sql' %}",
"destinationTable": {
"projectId": bq_prj,
"datasetId": "data_crm",
"tableId": "interactions",
},
"createDisposition": "CREATE_IF_NEEDED",
"writeDisposition": "WRITE_TRUNCATE",
"allowLargeResults": True,
"useLegacySql": False
}
)
回溯:
File "/opt/homebrew/lib/python3.8/site-packages/airflow/models/dagbag.py", line 326, in _load_modules_from_file
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 843, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/Users/seba/projects/composer/dags/crm/crm_interactions_import.py", line 737, in <module>
my_bq_task = BigQueryInsertJobOperator(
File "/opt/homebrew/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 167, in apply_defaults
raise AirflowException(msg)
airflow.exceptions.AirflowException: Argument ['configuration'] is required
我已经通过更正 JSON 规范解决了这个问题
我在 Airflow 2.1.4 中使用 BigQueryInsertJobOperator
时遇到以下错误:
airflow.exceptions.AirflowException: Argument ['configuration'] is required
如何正确定义配置?
代码:
my_bq_task = BigQueryInsertJobOperator(
dag=dag,
task_id="my_bq_task",
gcp_conn_id="google_cloud_default",
configuration={
"query": "{% include '/sql/load_crm_interactions.sql' %}",
"destinationTable": {
"projectId": bq_prj,
"datasetId": "data_crm",
"tableId": "interactions",
},
"createDisposition": "CREATE_IF_NEEDED",
"writeDisposition": "WRITE_TRUNCATE",
"allowLargeResults": True,
"useLegacySql": False
}
)
回溯:
File "/opt/homebrew/lib/python3.8/site-packages/airflow/models/dagbag.py", line 326, in _load_modules_from_file
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 843, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/Users/seba/projects/composer/dags/crm/crm_interactions_import.py", line 737, in <module>
my_bq_task = BigQueryInsertJobOperator(
File "/opt/homebrew/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 167, in apply_defaults
raise AirflowException(msg)
airflow.exceptions.AirflowException: Argument ['configuration'] is required
我已经通过更正 JSON 规范解决了这个问题