在 dataproc 上使用 PEX 环境打包 PySpark
Packaging PySpark with PEX environment on dataproc
我正在尝试使用 PEX 将 pyspark 作业打包为 google 云数据处理器上的 运行,但出现 Permission Denied
错误。
我已将我的第三方和本地依赖项打包到 env.pex
中,并将使用这些依赖项的入口点打包到 main.py
中。然后我 gsutil cp
这两个文件到 gs://<PATH>
和 运行 下面的脚本。
from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage
def submit_job(project_id: str, region: str, cluster_name: str):
job_client = dataproc.JobControllerClient(
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)
operation = job_client.submit_job_as_operation(
request={
"project_id": project_id,
"region": region,
"job": {
"placement": {"cluster_name": cluster_name},
"pyspark_job": {
"main_python_file_uri": "gs://<PATH>/main.py",
"file_uris": ["gs://<PATH>/env.pex"],
"properties": {
"spark.pyspark.python": "./env.pex",
"spark.executorEnv.PEX_ROOT": "./.pex",
},
},
},
}
)
我得到的错误是
Exception in thread "main" java.io.IOException: Cannot run program "./env.pex": error=13, Permission denied
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
at org.apache.spark.deploy.PythonRunner$.main(PythonRunner.scala:97)
at org.apache.spark.deploy.PythonRunner.main(PythonRunner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: error=13, Permission denied
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
at java.lang.ProcessImpl.start(ProcessImpl.java:134)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
... 14 more
我应该期望像这样打包我的环境才能工作吗?我没有看到一种方法来更改 pyspark 作业配置中作为 file_uris
包含的文件的权限,并且我没有在 google 云上看到任何关于使用 PEX 打包的文档,但是 PySpark official docs include this guide.
感谢任何帮助 - 谢谢!
您始终可以使用兼容的解释器 运行 PEX 文件。因此,您可以尝试 python env.pex
,而不是指定 ./env.pex
的程序。这不需要 env.pex
可执行。
最后我无法直接 运行 pex,但目前确实有一个解决方法,这是 pants slack community 中的用户建议的(谢谢!)。 ..
解决方法是在集群初始化脚本中将 pex 解压为 venv。
初始化脚本gsutil复制到gs://<PATH TO INIT SCRIPT>
:
#!/bin/bash
set -exo pipefail
readonly PEX_ENV_FILE_URI=$(/usr/share/google/get_metadata_value attributes/PEX_ENV_FILE_URI || true)
readonly PEX_FILES_DIR="/pexfiles"
readonly PEX_ENV_DIR="/pexenvs"
function err() {
echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $*" >&2
exit 1
}
function install_pex_into_venv() {
local -r pex_name=${PEX_ENV_FILE_URI##*/}
local -r pex_file="${PEX_FILES_DIR}/${pex_name}"
local -r pex_venv="${PEX_ENV_DIR}/${pex_name}"
echo "Installing pex from ${pex_file} into venv ${pex_venv}..."
gsutil cp "${PEX_ENV_FILE_URI}" "${pex_file}"
PEX_TOOLS=1 python "${pex_file}" venv --compile "${pex_venv}"
}
function main() {
if [[ -z "${PEX_ENV_FILE_URI}" ]]; then
err "ERROR: Must specify PEX_ENV_FILE_URI metadata key"
fi
install_pex_into_venv
}
main
启动集群和运行初始化脚本以将 pex 解压到集群上的 venv 中:
from google.cloud import dataproc_v1 as dataproc
def start_cluster(project_id: str, region: str, cluster_name: str):
cluster_client = dataproc.ClusterControllerClient(...)
operation = cluster_client.create_cluster(
request={
"project_id": project_id,
"region": region,
"cluster": {
"project_id": project_id,
"cluster_name": cluster_name,
"config": {
"master_config": <CONFIG>,
"worker_config": <CONFIG>,
"initialization_actions": [
{
"executable_file": "gs://<PATH TO INIT SCRIPT>",
},
],
"gce_cluster_config": {
"metadata": {"PEX_ENV_FILE_URI": "gs://<PATH>/env.pex"},
},
},
},
}
)
启动作业并将解压的 pex venv 用于 运行 pyspark 作业:
def submit_job(project_id: str, region: str, cluster_name: str):
job_client = dataproc.ClusterControllerClient(...)
operation = job_client.submit_job_as_operation(
request={
"project_id": project_id,
"region": region,
"job": {
"placement": {"cluster_name": cluster_name},
"pyspark_job": {
"main_python_file_uri": "gs://<PATH>/main.py",
"properties": {
"spark.pyspark.python": "/pexenvs/env.pex/bin/python",
},
},
},
}
)
我正在尝试使用 PEX 将 pyspark 作业打包为 google 云数据处理器上的 运行,但出现 Permission Denied
错误。
我已将我的第三方和本地依赖项打包到 env.pex
中,并将使用这些依赖项的入口点打包到 main.py
中。然后我 gsutil cp
这两个文件到 gs://<PATH>
和 运行 下面的脚本。
from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage
def submit_job(project_id: str, region: str, cluster_name: str):
job_client = dataproc.JobControllerClient(
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)
operation = job_client.submit_job_as_operation(
request={
"project_id": project_id,
"region": region,
"job": {
"placement": {"cluster_name": cluster_name},
"pyspark_job": {
"main_python_file_uri": "gs://<PATH>/main.py",
"file_uris": ["gs://<PATH>/env.pex"],
"properties": {
"spark.pyspark.python": "./env.pex",
"spark.executorEnv.PEX_ROOT": "./.pex",
},
},
},
}
)
我得到的错误是
Exception in thread "main" java.io.IOException: Cannot run program "./env.pex": error=13, Permission denied
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
at org.apache.spark.deploy.PythonRunner$.main(PythonRunner.scala:97)
at org.apache.spark.deploy.PythonRunner.main(PythonRunner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: error=13, Permission denied
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
at java.lang.ProcessImpl.start(ProcessImpl.java:134)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
... 14 more
我应该期望像这样打包我的环境才能工作吗?我没有看到一种方法来更改 pyspark 作业配置中作为 file_uris
包含的文件的权限,并且我没有在 google 云上看到任何关于使用 PEX 打包的文档,但是 PySpark official docs include this guide.
感谢任何帮助 - 谢谢!
您始终可以使用兼容的解释器 运行 PEX 文件。因此,您可以尝试 python env.pex
,而不是指定 ./env.pex
的程序。这不需要 env.pex
可执行。
最后我无法直接 运行 pex,但目前确实有一个解决方法,这是 pants slack community 中的用户建议的(谢谢!)。 ..
解决方法是在集群初始化脚本中将 pex 解压为 venv。
初始化脚本gsutil复制到gs://<PATH TO INIT SCRIPT>
:
#!/bin/bash
set -exo pipefail
readonly PEX_ENV_FILE_URI=$(/usr/share/google/get_metadata_value attributes/PEX_ENV_FILE_URI || true)
readonly PEX_FILES_DIR="/pexfiles"
readonly PEX_ENV_DIR="/pexenvs"
function err() {
echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $*" >&2
exit 1
}
function install_pex_into_venv() {
local -r pex_name=${PEX_ENV_FILE_URI##*/}
local -r pex_file="${PEX_FILES_DIR}/${pex_name}"
local -r pex_venv="${PEX_ENV_DIR}/${pex_name}"
echo "Installing pex from ${pex_file} into venv ${pex_venv}..."
gsutil cp "${PEX_ENV_FILE_URI}" "${pex_file}"
PEX_TOOLS=1 python "${pex_file}" venv --compile "${pex_venv}"
}
function main() {
if [[ -z "${PEX_ENV_FILE_URI}" ]]; then
err "ERROR: Must specify PEX_ENV_FILE_URI metadata key"
fi
install_pex_into_venv
}
main
启动集群和运行初始化脚本以将 pex 解压到集群上的 venv 中:
from google.cloud import dataproc_v1 as dataproc
def start_cluster(project_id: str, region: str, cluster_name: str):
cluster_client = dataproc.ClusterControllerClient(...)
operation = cluster_client.create_cluster(
request={
"project_id": project_id,
"region": region,
"cluster": {
"project_id": project_id,
"cluster_name": cluster_name,
"config": {
"master_config": <CONFIG>,
"worker_config": <CONFIG>,
"initialization_actions": [
{
"executable_file": "gs://<PATH TO INIT SCRIPT>",
},
],
"gce_cluster_config": {
"metadata": {"PEX_ENV_FILE_URI": "gs://<PATH>/env.pex"},
},
},
},
}
)
启动作业并将解压的 pex venv 用于 运行 pyspark 作业:
def submit_job(project_id: str, region: str, cluster_name: str):
job_client = dataproc.ClusterControllerClient(...)
operation = job_client.submit_job_as_operation(
request={
"project_id": project_id,
"region": region,
"job": {
"placement": {"cluster_name": cluster_name},
"pyspark_job": {
"main_python_file_uri": "gs://<PATH>/main.py",
"properties": {
"spark.pyspark.python": "/pexenvs/env.pex/bin/python",
},
},
},
}
)