DataprocCreateClusterOperator 面临的问题(Airflow 2.0)
Facing Issue with DataprocCreateClusterOperator (Airflow 2.0)
我正在尝试从 airflow 1.10 迁移到 Airflow 2,其中包括 - DataprocClusterCreateOperator
的一些运营商更改了名称。这是代码的摘录。
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator
with dag:
create_dataproc_cluster = dataproc_operator.DataprocCreateClusterOperator(
task_id = "create_test_dataproc_cluster",
project_id="project_id",
region="us-central1",
cluster_name="cluster_name",
tags="dataproc",
num_workers=2,
storage_bucket=None,
num_masters=1,
master_machine_type="n1-standard-4",
master_disk_type="pd-standard",
master_disk_size=500,
worker_machine_type="n1-standard-4",
worker_disk_type="pd-standard",
worker_disk_size=500,
properties={},
image_version="1.5-ubuntu18",
autoscaling_policy=None,
idle_delete_ttl=7200,
optional_components=['JUPYTER', 'ANACONDA'],
metadata={"bigquery-connector-version": '1.1.1',
"spark-bigquery-connector-version": '0.17.2',
"PIP_PACKAGES" : 'oyaml datalab'},
init_actions_uris =['gs://goog-dataproc-initialization-actions-us-central1/connectors/connectors.sh','gs://goog-dataproc-initialization-actions-us-central1/python/pip-install.sh']
)
create_dataproc_cluster
我遇到以下错误:
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 325, in create_cluster
result = client.create_cluster(
File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/dataproc_v1beta2/services/cluster_controller/client.py", line 445, in create_cluster
response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py", line 145, in __call__
return wrapped_func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/timeout.py", line 102, in func_with_timeout
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 67, in error_remapped_callable
return callable_(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/grpc/_channel.py", line 944, in __call__
state, call, = self._blocking(request, timeout, metadata, credentials,
File "/opt/python3.8/lib/python3.8/site-packages/grpc/_channel.py", line 926, in _blocking
call = self._channel.segregated_call(
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 498, in grpc._cython.cygrpc.Channel.segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 366, in grpc._cython.cygrpc._segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 360, in grpc._cython.cygrpc._segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 218, in grpc._cython.cygrpc._call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 246, in grpc._cython.cygrpc._call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 89, in grpc._cython.cygrpc._operate
File "src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi", line 64, in grpc._cython.cygrpc._BatchOperationTag.prepare
File "src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi", line 37, in grpc._cython.cygrpc.SendInitialMetadataOperation.c
File "src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi", line 41, in grpc._cython.cygrpc._store_c_metadata
ValueError: too many values to unpack (expected 2)
在调试时我发现这个问题是由于参数 metadata
引起的。
有没有人知道这个参数有什么问题或修复它的方法。
似乎在这个版本中 metadata
参数的类型不再是 dict
。来自 docs:
metadata (Sequence[Tuple[str, str]]
) -- Additional metadata that is
provided to the method.
试试:
metadata = [
("bigquery-connector-version", '1.1.1'),
("spark-bigquery-connector-version", '0.17.2'),
("PIP_PACKAGES", 'oyaml datalab')
]
编辑
根据这个issue, you'll need to Generate Cluster Config然后传给DataprocCreateClusterOperator
:
from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator
CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
task_id = "create_test_dataproc_cluster",
project_id="project_id",
region="us-central1",
metadata={'PIP_PACKAGES': 'yaml datalab'},
# ....
).make()
create_cluster_operator = DataprocCreateClusterOperator(
task_id='create_dataproc_cluster',
cluster_name="test",
project_id="project_id",
region="us-central1",
cluster_config=CLUSTER_GENERATOR_CONFIG,
)
我正在尝试从 airflow 1.10 迁移到 Airflow 2,其中包括 - DataprocClusterCreateOperator
的一些运营商更改了名称。这是代码的摘录。
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator
with dag:
create_dataproc_cluster = dataproc_operator.DataprocCreateClusterOperator(
task_id = "create_test_dataproc_cluster",
project_id="project_id",
region="us-central1",
cluster_name="cluster_name",
tags="dataproc",
num_workers=2,
storage_bucket=None,
num_masters=1,
master_machine_type="n1-standard-4",
master_disk_type="pd-standard",
master_disk_size=500,
worker_machine_type="n1-standard-4",
worker_disk_type="pd-standard",
worker_disk_size=500,
properties={},
image_version="1.5-ubuntu18",
autoscaling_policy=None,
idle_delete_ttl=7200,
optional_components=['JUPYTER', 'ANACONDA'],
metadata={"bigquery-connector-version": '1.1.1',
"spark-bigquery-connector-version": '0.17.2',
"PIP_PACKAGES" : 'oyaml datalab'},
init_actions_uris =['gs://goog-dataproc-initialization-actions-us-central1/connectors/connectors.sh','gs://goog-dataproc-initialization-actions-us-central1/python/pip-install.sh']
)
create_dataproc_cluster
我遇到以下错误:
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 325, in create_cluster
result = client.create_cluster(
File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/dataproc_v1beta2/services/cluster_controller/client.py", line 445, in create_cluster
response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py", line 145, in __call__
return wrapped_func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/timeout.py", line 102, in func_with_timeout
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 67, in error_remapped_callable
return callable_(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/grpc/_channel.py", line 944, in __call__
state, call, = self._blocking(request, timeout, metadata, credentials,
File "/opt/python3.8/lib/python3.8/site-packages/grpc/_channel.py", line 926, in _blocking
call = self._channel.segregated_call(
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 498, in grpc._cython.cygrpc.Channel.segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 366, in grpc._cython.cygrpc._segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 360, in grpc._cython.cygrpc._segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 218, in grpc._cython.cygrpc._call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 246, in grpc._cython.cygrpc._call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 89, in grpc._cython.cygrpc._operate
File "src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi", line 64, in grpc._cython.cygrpc._BatchOperationTag.prepare
File "src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi", line 37, in grpc._cython.cygrpc.SendInitialMetadataOperation.c
File "src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi", line 41, in grpc._cython.cygrpc._store_c_metadata
ValueError: too many values to unpack (expected 2)
在调试时我发现这个问题是由于参数 metadata
引起的。
有没有人知道这个参数有什么问题或修复它的方法。
似乎在这个版本中 metadata
参数的类型不再是 dict
。来自 docs:
metadata (
Sequence[Tuple[str, str]]
) -- Additional metadata that is provided to the method.
试试:
metadata = [
("bigquery-connector-version", '1.1.1'),
("spark-bigquery-connector-version", '0.17.2'),
("PIP_PACKAGES", 'oyaml datalab')
]
编辑
根据这个issue, you'll need to Generate Cluster Config然后传给DataprocCreateClusterOperator
:
from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator
CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
task_id = "create_test_dataproc_cluster",
project_id="project_id",
region="us-central1",
metadata={'PIP_PACKAGES': 'yaml datalab'},
# ....
).make()
create_cluster_operator = DataprocCreateClusterOperator(
task_id='create_dataproc_cluster',
cluster_name="test",
project_id="project_id",
region="us-central1",
cluster_config=CLUSTER_GENERATOR_CONFIG,
)