使用 DataprocCreateClusterOperator (Airflow 2.0) 传递元数据字段时面临问题

Facing Issue in passing metadata field with DataprocCreateClusterOperator (Airflow 2.0)

我在使用 DataprocCreateClusterOperator 在 Dataproc 集群中安装软件包时遇到了一些问题 我正在尝试升级到 Airflow 2.0

错误信息:

ValueError: metadata was invalid: [('bigquery-connector-version', '1.1.1'), ('spark-bigquery-connector-version', '0.17.2'), ('PIP_PACKAGES', 'oyaml'), ('x-goog-api-client', 'gl-python/3.8.12 grpc/1.39.0 gax/1.31.1 gccl/airflow_v2.1.2+composer')] 

进一步挖掘发现 [github link] (https://github.com/apache/airflow/pull/19446) ,社区论坛中讨论了这个 ValueError: metadata was invalid 问题。

我遵循 link 并使用 CLUSTER_CONFIG 方法为 DataprocCreateClusterOperator 生成 cluster_config 但现在 运行 进入另一个新问题,如下所示:

Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 67, in error_remapped_callable
    return callable_(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/opt/python3.8/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = "Compute Engine instance tag '-' must match pattern (?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)"
    debug_error_string = "{"created":"@1640080533.396337969","description":"Error received from peer ipv4:142.250.97.95:443","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Compute Engine instance tag '-' must match pattern (?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)","grpc_status":3}"

周围的资料不多

代码如下:

from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator
from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator

CLUSTER_CONFIG = ClusterGenerator(
    project_id="project_id",
    region="us-central1",
    cluster_name="cluster_name",
    tags="dataproc",
    num_workers=2,
    storage_bucket=None,
    num_masters=1,
    master_machine_type="n1-standard-4",
    master_disk_type="pd-standard",
    master_disk_size=1024,
    worker_machine_type="n1-standard-4",
    worker_disk_type="pd-standard",
    worker_disk_size=1024,
    properties={},
    image_version="1.5-ubuntu18",
    autoscaling_policy=None,
    idle_delete_ttl=7200,
    optional_components=['JUPYTER', 'ANACONDA'],
    metadata={"gcs-connector-version" : '2.1.1' , 
                  "bigquery-connector-version": '1.1.1',
                  "spark-bigquery-connector-version": '0.17.2',
                  "PIP_PACKAGES" : 'datalab shap oyaml click apache-airflow apache-airflow-providers-google'
                 },
    init_actions_uris =['gs://goog-dataproc-initialization-actions-{region}/connectors/connectors.sh','gs://goog-dataproc-initialization-actions-{region}/python/pip-install.sh']
).make()

with dag:
   create_dataproc_cluster = DataprocCreateClusterOperator(
        task_id="create_test_dataproc_cluster",
        cluster_name="cluster_name",
        project_id="project_id",
        region="us-central1",
        cluster_config=CLUSTER_CONFIG,
    )
    
   create_dataproc_cluster

以下 dag 按预期工作,已更改:

  • 集群名称(cluster_name -> cluster-name)。
  • 脚本路径。
  • Dag 定义。
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator
from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator

CLUSTER_CONFIG = ClusterGenerator(
    project_id="project_id",
    region="us-central1",
    cluster_name="cluster-name",
    tags=["dataproc"],
    num_workers=2,
    storage_bucket=None,
    num_masters=1,
    master_machine_type="n1-standard-4",
    master_disk_type="pd-standard",
    master_disk_size=1024,
    worker_machine_type="n1-standard-4",
    worker_disk_type="pd-standard",
    worker_disk_size=1024,
    properties={},
    image_version="1.5-ubuntu18",
    autoscaling_policy=None,
    idle_delete_ttl=7200,
    optional_components=['JUPYTER', 'ANACONDA'],
    metadata={"gcs-connector-version" : '2.1.1' , 
                  "bigquery-connector-version": '1.1.1',
                  "spark-bigquery-connector-version": '0.17.2',
                  "PIP_PACKAGES" : 'datalab shap oyaml click apache-airflow apache-airflow-providers-google'
                 },
    init_actions_uris =['gs://goog-dataproc-initialization-actions-us-central1/connectors/connectors.sh','gs://goog-dataproc-initialization-actions-us-central1/python/pip-install.sh']
).make()


with models.DAG(
    "example_gcp_dataproc",
    schedule_interval='@once',
    start_date=datetime(2021, 1, 1),
    catchup=False,
) as dag:
    
   create_dataproc_cluster = DataprocCreateClusterOperator(
        task_id="create_test_dataproc_cluster",
        cluster_name="cluster-name",
        project_id="<your-project-name>",
        region="us-central1",
        cluster_config=CLUSTER_CONFIG,
    )
    
   create_dataproc_cluster