DataprocCreateClusterOperator 由于类型错误而失败

DataprocCreateClusterOperator fails due to TypeError

编辑 1:问题与字段 "initialization_actions" 有关。最初我会在那里放一个字符串,现在我给了它它要求的对象:

from google.cloud.dataproc_v1beta2 import NodeInitializationAction

CLUSTER_CONFIG = {
    ...
    "initialization_actions": NodeInitializationAction({
        "executable_file": <string>})]
}

不幸的是它还在抱怨:

ERROR - Parameter to MergeFrom() must be instance of same class: expected google.cloud.dataproc.v1beta2.NodeInitializationAction got NodeInitializationAction.

我正在尝试使用 airflow.providers.google.cloud.operators.dataproc.DataprocCreateClusterOperator 部署 Dataproc 集群,但我得到了一个神秘的 TypeError

任务定义如下:

CLUSTER_CONFIG = {
    "config_bucket": <my_bucket>,
    "temp_bucket": <my_bucket>,
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "c2-standard-8",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
    },
    "initialization_actions": [<string>],
}

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    metadata=[("ENV", ENV)],
    dag=dag)

回溯:

Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/providers/google/cloud/operators/dataproc.py", line 603, in execut
    cluster = self._create_cluster(hook
  File "/usr/local/lib/airflow/airflow/providers/google/cloud/operators/dataproc.py", line 540, in _create_cluste
    metadata=self.metadata
  File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrappe
    return func(self, *args, **kwargs
  File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/dataproc.py", line 304, in create_cluste
    metadata=metadata
  File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/dataproc_v1beta2/services/cluster_controller/client.py", line 412, in create_cluste
    request = clusters.CreateClusterRequest(request
  File "/opt/python3.6/lib/python3.6/site-packages/proto/message.py", line 506, in __init_
    pb_value = marshal.to_proto(pb_type, value
  File "/opt/python3.6/lib/python3.6/site-packages/proto/marshal/marshal.py", line 208, in to_prot
    pb_value = rule.to_proto(value
  File "/opt/python3.6/lib/python3.6/site-packages/proto/marshal/rules/message.py", line 32, in to_prot
    return self._descriptor(**value
TypeError: Parameter to MergeFrom() must be instance of same class: expected google.cloud.dataproc.v1beta2.NodeInitializationAction got str

字段“initialization_actions”不是字符串列表,而是字典列表:

"initialization_actions": [{"executable_file": <string>}]