DataprocCreateClusterOperator 面临的问题(Airflow 2.0)

Facing Issue with DataprocCreateClusterOperator (Airflow 2.0)

我正在尝试从 airflow 1.10 迁移到 Airflow 2,其中包括 - DataprocClusterCreateOperator 的一些运营商更改了名称。这是代码的摘录。

from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator

with dag:

    create_dataproc_cluster = dataproc_operator.DataprocCreateClusterOperator(
        task_id = "create_test_dataproc_cluster",
        project_id="project_id",
        region="us-central1",
        cluster_name="cluster_name",
        tags="dataproc",
        num_workers=2,
        storage_bucket=None,
        num_masters=1,
        master_machine_type="n1-standard-4",
        master_disk_type="pd-standard",
        master_disk_size=500,
        worker_machine_type="n1-standard-4",
        worker_disk_type="pd-standard",
        worker_disk_size=500,
        properties={},
        image_version="1.5-ubuntu18",
        autoscaling_policy=None,
        idle_delete_ttl=7200,
        optional_components=['JUPYTER', 'ANACONDA'],
        metadata={"bigquery-connector-version": '1.1.1',
                  "spark-bigquery-connector-version": '0.17.2',
                  "PIP_PACKAGES" : 'oyaml datalab'},
        init_actions_uris =['gs://goog-dataproc-initialization-actions-us-central1/connectors/connectors.sh','gs://goog-dataproc-initialization-actions-us-central1/python/pip-install.sh']
        )
        
    create_dataproc_cluster 

我遇到以下错误:

File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 325, in create_cluster
    result = client.create_cluster(
  File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/dataproc_v1beta2/services/cluster_controller/client.py", line 445, in create_cluster
    response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
  File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py", line 145, in __call__
    return wrapped_func(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/timeout.py", line 102, in func_with_timeout
    return func(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 67, in error_remapped_callable
    return callable_(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/grpc/_channel.py", line 944, in __call__
    state, call, = self._blocking(request, timeout, metadata, credentials,
  File "/opt/python3.8/lib/python3.8/site-packages/grpc/_channel.py", line 926, in _blocking
    call = self._channel.segregated_call(
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 498, in grpc._cython.cygrpc.Channel.segregated_call
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 366, in grpc._cython.cygrpc._segregated_call
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 360, in grpc._cython.cygrpc._segregated_call
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 218, in grpc._cython.cygrpc._call
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 246, in grpc._cython.cygrpc._call
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 89, in grpc._cython.cygrpc._operate
  File "src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi", line 64, in grpc._cython.cygrpc._BatchOperationTag.prepare
  File "src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi", line 37, in grpc._cython.cygrpc.SendInitialMetadataOperation.c
  File "src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi", line 41, in grpc._cython.cygrpc._store_c_metadata
ValueError: too many values to unpack (expected 2)

在调试时我发现这个问题是由于参数 metadata 引起的。 有没有人知道这个参数有什么问题或修复它的方法。

似乎在这个版本中 metadata 参数的类型不再是 dict。来自 docs:

metadata (Sequence[Tuple[str, str]]) -- Additional metadata that is provided to the method.

试试:

metadata = [
    ("bigquery-connector-version", '1.1.1'),
    ("spark-bigquery-connector-version", '0.17.2'),
    ("PIP_PACKAGES", 'oyaml datalab')
]

编辑

根据这个issue, you'll need to Generate Cluster Config然后传给DataprocCreateClusterOperator:

from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator

CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
    task_id = "create_test_dataproc_cluster",
    project_id="project_id",
    region="us-central1",
    metadata={'PIP_PACKAGES': 'yaml datalab'},
    # ....
).make()

create_cluster_operator = DataprocCreateClusterOperator(
    task_id='create_dataproc_cluster',
    cluster_name="test",
    project_id="project_id",
    region="us-central1",
    cluster_config=CLUSTER_GENERATOR_CONFIG,
)