GCP Dataproc - 在初始化操作中使用 connectors.sh 时集群创建失败
GCP Dataproc - cluster creation failing when using connectors.sh in initialization-actions
我正在创建 Dataproc 集群,当我在初始化操作中添加 connectors.sh 时超时。
这是命令和错误
NUM_WORKER=2
TYPE=n1-highmem-8
CNAME=dataproc-poc
BUCKET=dataproc-spark-karan
REGION=us-central1
ZONE=us-central1-c
IMG_VERSION=2.0.29-debian10
PROJECT=versa-kafka-poc
NUM_WORKER=2
Karans-MacBook-Pro:dataproc-versa-sase karanalang$ gcloud beta dataproc clusters create $CNAME \
> --enable-component-gateway \
> --bucket $BUCKET \
> --region $REGION \
> --zone $ZONE \
> --no-address --master-machine-type $TYPE \
> --master-boot-disk-size 100 \
> --master-boot-disk-type pd-ssd \
> --num-workers $NUM_WORKER \
> --worker-machine-type $TYPE \
> --worker-boot-disk-type pd-ssd \
> --worker-boot-disk-size 100 \
> --image-version $IMG_VERSION \
> --scopes 'https://www.googleapis.com/auth/cloud-platform' \
> --project $PROJECT \
> --initialization-actions 'gs://dataproc-kafka/config/pip_install.sh','gs://dataproc-kafka/config/connectors.sh' \
> --metadata 'gcs-connector-version=2.0.0' \
> --metadata 'bigquery-connector-version=1.2.0' \
> --properties 'dataproc:dataproc.logging.stackdriver.job.driver.enable=true,dataproc:job.history.to-gcs.enabled=true,spark:spark.dynamicAllocation.enabled=false,spark:spark.executor.instances=6,spark:spark.executor.cores=2,spark:spark.eventLog.dir=gs://dataproc-spark-karan/joblogs,spark:spark.history.fs.logDirectory=gs://dataproc-spark-karan/joblogs'
Waiting on operation [projects/versa-kafka-poc/regions/us-central1/operations/8aa13a77-30a8-3a84-a949-16b4d8907c45].
Waiting for cluster creation operation...
WARNING: This cluster is configured to use network 'https://www.googleapis.com/compute/v1/projects/versa-kafka-poc/global/networks/default' and its associated firewall rules '[prometheus-nodeport]' which contains the following potential security vulnerability: 'port 8088 is open to the internet, this may allow arbitrary code execution via the YARN REST API. Use Component Gateway for secure remote access to the YARN UI and other cluster UIs instead: https://cloud.google.com/dataproc/docs/concepts/accessing/dataproc-gateways.'
Waiting for cluster creation operation...done.
ERROR: (gcloud.beta.dataproc.clusters.create) Operation [projects/versa-kafka-poc/regions/us-central1/operations/8aa13a77-30a8-3a84-a949-16b4d8907c45] timed out.
connectors.sh
#!/bin/bash
set -euxo pipefail
VM_CONNECTORS_HADOOP_DIR=/usr/lib/hadoop/lib
VM_CONNECTORS_DATAPROC_DIR=/usr/local/share/google/dataproc/lib
declare -A MIN_CONNECTOR_VERSIONS
MIN_CONNECTOR_VERSIONS=(
["bigquery"]="0.11.0"
["gcs"]="1.7.0")
# Starting from these versions connectors name changed:
# "...-<version>-hadoop2.jar" -> "...-hadoop2-<version>.jar"
declare -A NEW_NAME_MIN_CONNECTOR_VERSIONS
NEW_NAME_MIN_CONNECTOR_VERSIONS=(
["bigquery"]="0.13.5"
["gcs"]="1.9.5")
BIGQUERY_CONNECTOR_VERSION=$(/usr/share/google/get_metadata_value attributes/bigquery-connector-version || true)
GCS_CONNECTOR_VERSION=$(/usr/share/google/get_metadata_value attributes/gcs-connector-version || true)
UPDATED_GCS_CONNECTOR=false
is_worker() {
local role
role="$(/usr/share/google/get_metadata_value attributes/dataproc-role || true)"
if [[ $role != Master ]]; then
return 0
fi
return 1
}
min_version() {
echo -e "\n" | sort -r -t'.' -n -k1,1 -k2,2 -k3,3 | tail -n1
}
validate_version() {
local name= # connector name: "bigquery" or "gcs"
local version= # connector version
local min_valid_version=${MIN_CONNECTOR_VERSIONS[$name]}
if [[ "$(min_version "$min_valid_version" "$version")" != "$min_valid_version" ]]; then
echo "ERROR: $name-connector version should be greater than or equal to $min_valid_version, but was $version"
return 1
fi
}
update_connector() {
local name= # connector name: "bigquery" or "gcs"
local version= # connector version
if [[ $version ]]; then
if [[ $name == gcs ]]; then
UPDATED_GCS_CONNECTOR=true
fi
# validate new connector version
validate_version "$name" "$version"
if [[ -d ${VM_CONNECTORS_DATAPROC_DIR} ]]; then
local vm_connectors_dir=${VM_CONNECTORS_DATAPROC_DIR}
else
local vm_connectors_dir=${VM_CONNECTORS_HADOOP_DIR}
fi
# remove old connector
rm -f "${vm_connectors_dir}/${name}-connector-"*
# download new connector
# connector name could be in one of 2 formats:
# 1) gs://hadoop-lib/${name}/${name}-connector-hadoop2-${version}.jar
# 2) gs://hadoop-lib/${name}/${name}-connector-${version}-hadoop2.jar
local new_name_min_version=${NEW_NAME_MIN_CONNECTOR_VERSIONS[$name]}
if [[ "$(min_version "$new_name_min_version" "$version")" == "$new_name_min_version" ]]; then
local jar_name="${name}-connector-hadoop2-${version}.jar"
else
local jar_name="${name}-connector-${version}-hadoop2.jar"
fi
gsutil cp "gs://hadoop-lib/${name}/${jar_name}" "${vm_connectors_dir}/"
# Update or create version-less connector link
ln -s -f "${vm_connectors_dir}/${jar_name}" "${vm_connectors_dir}/${name}-connector.jar"
fi
}
if [[ -z $BIGQUERY_CONNECTOR_VERSION ]] && [[ -z $GCS_CONNECTOR_VERSION ]]; then
echo "ERROR: None of connector versions are specified"
exit 1
fi
# because connectors from 1.7 branch are not compatible with previous connectors
# versions (they have the same class relocation paths) we need to update both
# of them, even if only one connector version is set
if [[ -z $BIGQUERY_CONNECTOR_VERSION ]] && [[ $GCS_CONNECTOR_VERSION == "1.7.0" ]]; then
BIGQUERY_CONNECTOR_VERSION="0.11.0"
fi
if [[ $BIGQUERY_CONNECTOR_VERSION == "0.11.0" ]] && [[ -z $GCS_CONNECTOR_VERSION ]]; then
GCS_CONNECTOR_VERSION="1.7.0"
fi
update_connector "bigquery" "$BIGQUERY_CONNECTOR_VERSION"
update_connector "gcs" "$GCS_CONNECTOR_VERSION"
if [[ $UPDATED_GCS_CONNECTOR != true ]]; then
echo "GCS connector wasn't updated - no need to restart any services"
exit 0
fi
# Restart YARN NodeManager service on worker nodes so they can pick up updated GCS connector
if is_worker; then
systemctl kill -s KILL hadoop-yarn-nodemanager
fi
# Restarts Dataproc Agent after successful initialization
# WARNING: this function relies on undocumented and not officially supported Dataproc Agent
# "sentinel" files to determine successful Agent initialization and not guaranteed
# to work in the future. Use at your own risk!
restart_dataproc_agent() {
# Because Dataproc Agent should be restarted after initialization, we need to wait until
# it will create a sentinel file that signals initialization competition (success or failure)
while [[ ! -f /var/lib/google/dataproc/has_run_before ]]; do
sleep 1
done
# If Dataproc Agent didn't create a sentinel file that signals initialization
# failure then it means that initialization succeded and it should be restarted
if [[ ! -f /var/lib/google/dataproc/has_failed_before ]]; then
systemctl kill -s KILL google-dataproc-agent
fi
}
export -f restart_dataproc_agent
# Schedule asynchronous Dataproc Agent restart so it will use updated connectors.
# It could not be restarted sycnhronously because Dataproc Agent should be restarted
# after its initialization, including init actions execution, has been completed.
bash -c restart_dataproc_agent &
disown
据我了解,connectors.sh 只是确保集群中包含正确版本的连接器。
另外 - 没有 connectors.sh,安装正常。
我如何debug/fix这个?
蒂亚!
看来您使用的是旧版本的初始化操作脚本。根据Dataproc GitHub repo的文档,您可以通过以下方式在没有脚本的情况下设置Hadoop GCS连接器的版本:
gcloud dataproc clusters create ${CLUSTER_NAME} \
--region ${REGION} \
--metadata GCS_CONNECTOR_VERSION=2.2.2
对于 BigQuery 连接器(Spark 或 Hadoop MR),请按以下方式使用 up-to-date 初始化操作:
--initialization-actions gs://${BUCKET}/connectors.sh \
--metadata bigquery-connector-version=1.2.0 \
--metadata spark-bigquery-connector-version=0.23.2
请注意,同一存储库还包含 updated pip-install 初始化操作。
我正在创建 Dataproc 集群,当我在初始化操作中添加 connectors.sh 时超时。
这是命令和错误
NUM_WORKER=2
TYPE=n1-highmem-8
CNAME=dataproc-poc
BUCKET=dataproc-spark-karan
REGION=us-central1
ZONE=us-central1-c
IMG_VERSION=2.0.29-debian10
PROJECT=versa-kafka-poc
NUM_WORKER=2
Karans-MacBook-Pro:dataproc-versa-sase karanalang$ gcloud beta dataproc clusters create $CNAME \
> --enable-component-gateway \
> --bucket $BUCKET \
> --region $REGION \
> --zone $ZONE \
> --no-address --master-machine-type $TYPE \
> --master-boot-disk-size 100 \
> --master-boot-disk-type pd-ssd \
> --num-workers $NUM_WORKER \
> --worker-machine-type $TYPE \
> --worker-boot-disk-type pd-ssd \
> --worker-boot-disk-size 100 \
> --image-version $IMG_VERSION \
> --scopes 'https://www.googleapis.com/auth/cloud-platform' \
> --project $PROJECT \
> --initialization-actions 'gs://dataproc-kafka/config/pip_install.sh','gs://dataproc-kafka/config/connectors.sh' \
> --metadata 'gcs-connector-version=2.0.0' \
> --metadata 'bigquery-connector-version=1.2.0' \
> --properties 'dataproc:dataproc.logging.stackdriver.job.driver.enable=true,dataproc:job.history.to-gcs.enabled=true,spark:spark.dynamicAllocation.enabled=false,spark:spark.executor.instances=6,spark:spark.executor.cores=2,spark:spark.eventLog.dir=gs://dataproc-spark-karan/joblogs,spark:spark.history.fs.logDirectory=gs://dataproc-spark-karan/joblogs'
Waiting on operation [projects/versa-kafka-poc/regions/us-central1/operations/8aa13a77-30a8-3a84-a949-16b4d8907c45].
Waiting for cluster creation operation...
WARNING: This cluster is configured to use network 'https://www.googleapis.com/compute/v1/projects/versa-kafka-poc/global/networks/default' and its associated firewall rules '[prometheus-nodeport]' which contains the following potential security vulnerability: 'port 8088 is open to the internet, this may allow arbitrary code execution via the YARN REST API. Use Component Gateway for secure remote access to the YARN UI and other cluster UIs instead: https://cloud.google.com/dataproc/docs/concepts/accessing/dataproc-gateways.'
Waiting for cluster creation operation...done.
ERROR: (gcloud.beta.dataproc.clusters.create) Operation [projects/versa-kafka-poc/regions/us-central1/operations/8aa13a77-30a8-3a84-a949-16b4d8907c45] timed out.
connectors.sh
#!/bin/bash
set -euxo pipefail
VM_CONNECTORS_HADOOP_DIR=/usr/lib/hadoop/lib
VM_CONNECTORS_DATAPROC_DIR=/usr/local/share/google/dataproc/lib
declare -A MIN_CONNECTOR_VERSIONS
MIN_CONNECTOR_VERSIONS=(
["bigquery"]="0.11.0"
["gcs"]="1.7.0")
# Starting from these versions connectors name changed:
# "...-<version>-hadoop2.jar" -> "...-hadoop2-<version>.jar"
declare -A NEW_NAME_MIN_CONNECTOR_VERSIONS
NEW_NAME_MIN_CONNECTOR_VERSIONS=(
["bigquery"]="0.13.5"
["gcs"]="1.9.5")
BIGQUERY_CONNECTOR_VERSION=$(/usr/share/google/get_metadata_value attributes/bigquery-connector-version || true)
GCS_CONNECTOR_VERSION=$(/usr/share/google/get_metadata_value attributes/gcs-connector-version || true)
UPDATED_GCS_CONNECTOR=false
is_worker() {
local role
role="$(/usr/share/google/get_metadata_value attributes/dataproc-role || true)"
if [[ $role != Master ]]; then
return 0
fi
return 1
}
min_version() {
echo -e "\n" | sort -r -t'.' -n -k1,1 -k2,2 -k3,3 | tail -n1
}
validate_version() {
local name= # connector name: "bigquery" or "gcs"
local version= # connector version
local min_valid_version=${MIN_CONNECTOR_VERSIONS[$name]}
if [[ "$(min_version "$min_valid_version" "$version")" != "$min_valid_version" ]]; then
echo "ERROR: $name-connector version should be greater than or equal to $min_valid_version, but was $version"
return 1
fi
}
update_connector() {
local name= # connector name: "bigquery" or "gcs"
local version= # connector version
if [[ $version ]]; then
if [[ $name == gcs ]]; then
UPDATED_GCS_CONNECTOR=true
fi
# validate new connector version
validate_version "$name" "$version"
if [[ -d ${VM_CONNECTORS_DATAPROC_DIR} ]]; then
local vm_connectors_dir=${VM_CONNECTORS_DATAPROC_DIR}
else
local vm_connectors_dir=${VM_CONNECTORS_HADOOP_DIR}
fi
# remove old connector
rm -f "${vm_connectors_dir}/${name}-connector-"*
# download new connector
# connector name could be in one of 2 formats:
# 1) gs://hadoop-lib/${name}/${name}-connector-hadoop2-${version}.jar
# 2) gs://hadoop-lib/${name}/${name}-connector-${version}-hadoop2.jar
local new_name_min_version=${NEW_NAME_MIN_CONNECTOR_VERSIONS[$name]}
if [[ "$(min_version "$new_name_min_version" "$version")" == "$new_name_min_version" ]]; then
local jar_name="${name}-connector-hadoop2-${version}.jar"
else
local jar_name="${name}-connector-${version}-hadoop2.jar"
fi
gsutil cp "gs://hadoop-lib/${name}/${jar_name}" "${vm_connectors_dir}/"
# Update or create version-less connector link
ln -s -f "${vm_connectors_dir}/${jar_name}" "${vm_connectors_dir}/${name}-connector.jar"
fi
}
if [[ -z $BIGQUERY_CONNECTOR_VERSION ]] && [[ -z $GCS_CONNECTOR_VERSION ]]; then
echo "ERROR: None of connector versions are specified"
exit 1
fi
# because connectors from 1.7 branch are not compatible with previous connectors
# versions (they have the same class relocation paths) we need to update both
# of them, even if only one connector version is set
if [[ -z $BIGQUERY_CONNECTOR_VERSION ]] && [[ $GCS_CONNECTOR_VERSION == "1.7.0" ]]; then
BIGQUERY_CONNECTOR_VERSION="0.11.0"
fi
if [[ $BIGQUERY_CONNECTOR_VERSION == "0.11.0" ]] && [[ -z $GCS_CONNECTOR_VERSION ]]; then
GCS_CONNECTOR_VERSION="1.7.0"
fi
update_connector "bigquery" "$BIGQUERY_CONNECTOR_VERSION"
update_connector "gcs" "$GCS_CONNECTOR_VERSION"
if [[ $UPDATED_GCS_CONNECTOR != true ]]; then
echo "GCS connector wasn't updated - no need to restart any services"
exit 0
fi
# Restart YARN NodeManager service on worker nodes so they can pick up updated GCS connector
if is_worker; then
systemctl kill -s KILL hadoop-yarn-nodemanager
fi
# Restarts Dataproc Agent after successful initialization
# WARNING: this function relies on undocumented and not officially supported Dataproc Agent
# "sentinel" files to determine successful Agent initialization and not guaranteed
# to work in the future. Use at your own risk!
restart_dataproc_agent() {
# Because Dataproc Agent should be restarted after initialization, we need to wait until
# it will create a sentinel file that signals initialization competition (success or failure)
while [[ ! -f /var/lib/google/dataproc/has_run_before ]]; do
sleep 1
done
# If Dataproc Agent didn't create a sentinel file that signals initialization
# failure then it means that initialization succeded and it should be restarted
if [[ ! -f /var/lib/google/dataproc/has_failed_before ]]; then
systemctl kill -s KILL google-dataproc-agent
fi
}
export -f restart_dataproc_agent
# Schedule asynchronous Dataproc Agent restart so it will use updated connectors.
# It could not be restarted sycnhronously because Dataproc Agent should be restarted
# after its initialization, including init actions execution, has been completed.
bash -c restart_dataproc_agent &
disown
据我了解,connectors.sh 只是确保集群中包含正确版本的连接器。 另外 - 没有 connectors.sh,安装正常。
我如何debug/fix这个? 蒂亚!
看来您使用的是旧版本的初始化操作脚本。根据Dataproc GitHub repo的文档,您可以通过以下方式在没有脚本的情况下设置Hadoop GCS连接器的版本:
gcloud dataproc clusters create ${CLUSTER_NAME} \
--region ${REGION} \
--metadata GCS_CONNECTOR_VERSION=2.2.2
对于 BigQuery 连接器(Spark 或 Hadoop MR),请按以下方式使用 up-to-date 初始化操作:
--initialization-actions gs://${BUCKET}/connectors.sh \
--metadata bigquery-connector-version=1.2.0 \
--metadata spark-bigquery-connector-version=0.23.2
请注意,同一存储库还包含 updated pip-install 初始化操作。