Airflow 2 - BigQueryInsertJobOperator - 如何解决 airflow.exceptions.AirflowException:需要参数 ['configuration']

Airflow 2 - BigQueryInsertJobOperator - how to resolve airflow.exceptions.AirflowException: Argument ['configuration'] is required

我在 Airflow 2.1.4 中使用 BigQueryInsertJobOperator 时遇到以下错误:

airflow.exceptions.AirflowException: Argument ['configuration'] is required

如何正确定义配置?

代码:

    my_bq_task = BigQueryInsertJobOperator(
        dag=dag,
        task_id="my_bq_task",
        gcp_conn_id="google_cloud_default",
        configuration={
            "query": "{% include '/sql/load_crm_interactions.sql' %}",
            "destinationTable": {
                "projectId": bq_prj,
                "datasetId": "data_crm",
                "tableId": "interactions",
            },
            "createDisposition": "CREATE_IF_NEEDED",
            "writeDisposition": "WRITE_TRUNCATE",
            "allowLargeResults": True,
            "useLegacySql": False
        }
    )

回溯:

  File "/opt/homebrew/lib/python3.8/site-packages/airflow/models/dagbag.py", line 326, in _load_modules_from_file
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 843, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/Users/seba/projects/composer/dags/crm/crm_interactions_import.py", line 737, in <module>
    my_bq_task = BigQueryInsertJobOperator(
  File "/opt/homebrew/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 167, in apply_defaults
    raise AirflowException(msg)
airflow.exceptions.AirflowException: Argument ['configuration'] is required

我已经通过更正 JSON 规范解决了这个问题