云作曲家任务无法创建数据处理集群

cloud composer task unable to create dataproc cluster

我正在尝试使用云作曲家运算符创建 dataproc 集群。 这是我的 DAG 的样子:

default_dag_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email': ['****************'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
  }
CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 10},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 10},
    },
}

with models.DAG(
        'PanelSettings_dag',
        schedule_interval="@daily",
        default_args=default_dag_args) as dag:

    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )
    
    create_cluster = DataprocCreateClusterOperator(
        task_id="create_cluster",
        gcp_conn_id='google-dataproc',
        project_id=GCP_PROJECT_ID,
        cluster_config=CLUSTER_CONFIG,
        region=REGION,
        cluster_name=CLUSTER_NAME,
    )

我已经在 airflow 上创建了 dataproc 连接,并为服务帐户授予了 dataproc 管理员和存储管理员角色。 没有这个连接,我得到了一个错误:

Getting connection using `google.auth.default()` since no key file is defined for hook.

现在我收到错误:

[2021-06-16 21:30:48,109] {taskinstance.py:1152} ERROR - 501 Received http2 header with status: 404
Traceback (most recent call last)
  File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 73, in error_remapped_callabl
    return callable_(*args, **kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/grpc/_channel.py", line 923, in __call_
    return _end_unary_response_blocking(state, call, False, None
  File "/opt/python3.6/lib/python3.6/site-packages/grpc/_channel.py", line 826, in _end_unary_response_blockin
    raise _InactiveRpcError(state
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with
    status = StatusCode.UNIMPLEMENTE
    details = "Received http2 header with status: 404
    debug_error_string = "{"created":"@1623879048.108981125","description":"Received http2 :status header with non-200 OK status","file":"src/core/ext/filters/http/client/http_client_filter.cc","file_line":129,"grpc_message":"Received http2 header with status: 404","grpc_status":12,"value":"404"}

我是气流的新手。有人可以帮助调试这个。无法理解我做错了什么。

错误是在区域字段中输入了区域名称。 我纠正了它并且它起作用了。 如果提到错误“region not found/not exists”,那将会很有帮助。