如何 运行 在 google dataproc 上启动 3.2.0?
How to run spark 3.2.0 on google dataproc?
目前,google dataproc 没有 spark 3.2.0 作为映像。最新可用的是 3.1.2。我想在 pyspark 功能上使用 pandas,spark 随 3.2.0 一起发布。
我正在执行以下步骤来使用 spark 3.2.0
- 在本地创建了一个环境'pyspark',其中包含 pyspark 3.2.0
- 使用
conda env export > environment.yaml
导出了环境 yaml
- 使用此 environment.yaml 创建了一个 dataproc 集群。集群创建正确,环境在 master 和所有 worker 上可用
- 然后我更改环境变量。
export SPARK_HOME=/opt/conda/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark
(指向 pyspark 3.2.0),export SPARK_CONF_DIR=/usr/lib/spark/conf
(使用 dataproc 的配置文件)和 export PYSPARK_PYTHON=/opt/conda/miniconda3/envs/pyspark/bin/python
(使环境包可用)
现在,如果我尝试 运行 pyspark shell,我会得到:
21/12/07 01:25:16 ERROR org.apache.spark.scheduler.AsyncEventQueue: Listener AppStatusListener threw an exception
java.lang.NumberFormatException: For input string: "null"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at scala.collection.immutable.StringLike.toInt(StringLike.scala:304)
at scala.collection.immutable.StringLike.toInt$(StringLike.scala:304)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:33)
at org.apache.spark.util.Utils$.parseHostPort(Utils.scala:1126)
at org.apache.spark.status.ProcessSummaryWrapper.<init>(storeTypes.scala:527)
at org.apache.spark.status.LiveMiscellaneousProcess.doUpdate(LiveEntity.scala:924)
at org.apache.spark.status.LiveEntity.write(LiveEntity.scala:50)
at org.apache.spark.status.AppStatusListener.update(AppStatusListener.scala:1213)
at org.apache.spark.status.AppStatusListener.onMiscellaneousProcessAdded(AppStatusListener.scala:1427)
at org.apache.spark.status.AppStatusListener.onOtherEvent(AppStatusListener.scala:113)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch(AsyncEventQueue.scala:105)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
at org.apache.spark.scheduler.AsyncEventQueue$$anon.$anonfun$run(AsyncEventQueue.scala:96)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1404)
at org.apache.spark.scheduler.AsyncEventQueue$$anon.run(AsyncEventQueue.scala:96)
但是,shell 确实会在此之后开始。但是,它不执行代码。抛出异常:
我试过 运行ning:
set(sc.parallelize(range(10),10).map(lambda x: socket.gethostname()).collect())
但是,我得到:
21/12/07 01:32:15 WARN org.apache.spark.deploy.yarn.YarnAllocator: Container from a bad node: container_1638782400702_0003_01_000001 on host: monsoon-test1-w-2.us-central1-c.c.monsoon-credittech.internal. Exit status: 1. Diagnostics: [2021-12-07
01:32:13.672]Exception from container-launch.
Container id: container_1638782400702_0003_01_000001
Exit code: 1
[2021-12-07 01:32:13.717]Container exited with a non-zero exit code 1. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
ltChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
21/12/07 01:31:43 ERROR org.apache.spark.executor.YarnCoarseGrainedExecutorBackend: Executor self-exiting due to : Driver monsoon-test1-m.us-central1-c.c.monsoon-credittech.internal:44367 disassociated! Shutting down.
21/12/07 01:32:13 WARN org.apache.hadoop.util.ShutdownHookManager: ShutdownHook '$anon' timeout, java.util.concurrent.TimeoutException
java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at org.apache.hadoop.util.ShutdownHookManager.executeShutdown(ShutdownHookManager.java:124)
at org.apache.hadoop.util.ShutdownHookManager.run(ShutdownHookManager.java:95)
21/12/07 01:32:13 ERROR org.apache.spark.util.Utils: Uncaught exception in thread shutdown-hook-0
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
at java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:675)
at org.apache.spark.rpc.netty.MessageLoop.stop(MessageLoop.scala:60)
at org.apache.spark.rpc.netty.Dispatcher.$anonfun$stop(Dispatcher.scala:197)
at org.apache.spark.rpc.netty.Dispatcher.$anonfun$stop$adapted(Dispatcher.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.rpc.netty.Dispatcher.stop(Dispatcher.scala:194)
at org.apache.spark.rpc.netty.NettyRpcEnv.cleanup(NettyRpcEnv.scala:331)
at org.apache.spark.rpc.netty.NettyRpcEnv.shutdown(NettyRpcEnv.scala:309)
at org.apache.spark.SparkEnv.stop(SparkEnv.scala:96)
at org.apache.spark.executor.Executor.stop(Executor.scala:335)
at org.apache.spark.executor.Executor.$anonfun$new(Executor.scala:76)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon.run(ShutdownHookManager.scala:178)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
并且相同的错误在停止之前重复多次。
我做错了什么以及如何在 google dataproc 上使用 python 3.2.0?
可以通过以下方式实现:
- 使用包含 pyspark 3.2 作为包的环境 (
your_sample_env
) 创建数据处理集群
- 通过添加
修改/usr/lib/spark/conf/spark-env.sh
SPARK_HOME="/opt/conda/miniconda3/envs/your_sample_env/lib/python/site-packages/pyspark"
SPARK_CONF="/usr/lib/spark/conf"
结束
- 修改
/usr/lib/spark/conf/spark-defaults.conf
注释掉以下配置
spark.yarn.jars=local:/usr/lib/spark/jars/*
spark.yarn.unmanagedAM.enabled=true
现在,您的 Spark 作业将使用 pyspark 3.2
Dataproc Serverless for Spark 刚刚发布,支持 Spark 3.2.0:https://cloud.google.com/dataproc-serverless
@milominderbinder 的回答在笔记本中对我不起作用。我使用了 google 给出的 pip install script 并在 main.
中添加了以下代码
function main() {
install_pip
pip install pyspark==3.2.0
sed -i '4d;27d' /usr/lib/spark/conf/spark-defaults.conf
cat << EOF | tee -a /etc/profile.d/custom_env.sh /etc/*bashrc >/dev/null
export SPARK_HOME=/opt/conda/miniconda3/lib/python3.8/site-packages/pyspark/
export SPARK_CONF=/usr/lib/spark/conf
EOF
sed -i 's/\/usr\/lib\/spark/\/opt\/conda\/miniconda3\/lib\/python3.8\/site-packages\/pyspark\//g' /opt/conda/miniconda3/share/jupyter/kernels/python3/kernel.json
if [[ -z "${PACKAGES}" ]]; then
echo "WARNING: requirements empty"
exit 0
fi
run_with_retry pip install --upgrade ${PACKAGES}
}
这使得它在具有 Python3 内核的 jupyterlab 中工作。
目前,google dataproc 没有 spark 3.2.0 作为映像。最新可用的是 3.1.2。我想在 pyspark 功能上使用 pandas,spark 随 3.2.0 一起发布。
我正在执行以下步骤来使用 spark 3.2.0
- 在本地创建了一个环境'pyspark',其中包含 pyspark 3.2.0
- 使用
conda env export > environment.yaml
导出了环境 yaml
- 使用此 environment.yaml 创建了一个 dataproc 集群。集群创建正确,环境在 master 和所有 worker 上可用
- 然后我更改环境变量。
export SPARK_HOME=/opt/conda/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark
(指向 pyspark 3.2.0),export SPARK_CONF_DIR=/usr/lib/spark/conf
(使用 dataproc 的配置文件)和export PYSPARK_PYTHON=/opt/conda/miniconda3/envs/pyspark/bin/python
(使环境包可用)
现在,如果我尝试 运行 pyspark shell,我会得到:
21/12/07 01:25:16 ERROR org.apache.spark.scheduler.AsyncEventQueue: Listener AppStatusListener threw an exception
java.lang.NumberFormatException: For input string: "null"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at scala.collection.immutable.StringLike.toInt(StringLike.scala:304)
at scala.collection.immutable.StringLike.toInt$(StringLike.scala:304)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:33)
at org.apache.spark.util.Utils$.parseHostPort(Utils.scala:1126)
at org.apache.spark.status.ProcessSummaryWrapper.<init>(storeTypes.scala:527)
at org.apache.spark.status.LiveMiscellaneousProcess.doUpdate(LiveEntity.scala:924)
at org.apache.spark.status.LiveEntity.write(LiveEntity.scala:50)
at org.apache.spark.status.AppStatusListener.update(AppStatusListener.scala:1213)
at org.apache.spark.status.AppStatusListener.onMiscellaneousProcessAdded(AppStatusListener.scala:1427)
at org.apache.spark.status.AppStatusListener.onOtherEvent(AppStatusListener.scala:113)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch(AsyncEventQueue.scala:105)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
at org.apache.spark.scheduler.AsyncEventQueue$$anon.$anonfun$run(AsyncEventQueue.scala:96)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1404)
at org.apache.spark.scheduler.AsyncEventQueue$$anon.run(AsyncEventQueue.scala:96)
但是,shell 确实会在此之后开始。但是,它不执行代码。抛出异常:
我试过 运行ning:
set(sc.parallelize(range(10),10).map(lambda x: socket.gethostname()).collect())
但是,我得到:
21/12/07 01:32:15 WARN org.apache.spark.deploy.yarn.YarnAllocator: Container from a bad node: container_1638782400702_0003_01_000001 on host: monsoon-test1-w-2.us-central1-c.c.monsoon-credittech.internal. Exit status: 1. Diagnostics: [2021-12-07
01:32:13.672]Exception from container-launch.
Container id: container_1638782400702_0003_01_000001
Exit code: 1
[2021-12-07 01:32:13.717]Container exited with a non-zero exit code 1. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
ltChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
21/12/07 01:31:43 ERROR org.apache.spark.executor.YarnCoarseGrainedExecutorBackend: Executor self-exiting due to : Driver monsoon-test1-m.us-central1-c.c.monsoon-credittech.internal:44367 disassociated! Shutting down.
21/12/07 01:32:13 WARN org.apache.hadoop.util.ShutdownHookManager: ShutdownHook '$anon' timeout, java.util.concurrent.TimeoutException
java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at org.apache.hadoop.util.ShutdownHookManager.executeShutdown(ShutdownHookManager.java:124)
at org.apache.hadoop.util.ShutdownHookManager.run(ShutdownHookManager.java:95)
21/12/07 01:32:13 ERROR org.apache.spark.util.Utils: Uncaught exception in thread shutdown-hook-0
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
at java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:675)
at org.apache.spark.rpc.netty.MessageLoop.stop(MessageLoop.scala:60)
at org.apache.spark.rpc.netty.Dispatcher.$anonfun$stop(Dispatcher.scala:197)
at org.apache.spark.rpc.netty.Dispatcher.$anonfun$stop$adapted(Dispatcher.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.rpc.netty.Dispatcher.stop(Dispatcher.scala:194)
at org.apache.spark.rpc.netty.NettyRpcEnv.cleanup(NettyRpcEnv.scala:331)
at org.apache.spark.rpc.netty.NettyRpcEnv.shutdown(NettyRpcEnv.scala:309)
at org.apache.spark.SparkEnv.stop(SparkEnv.scala:96)
at org.apache.spark.executor.Executor.stop(Executor.scala:335)
at org.apache.spark.executor.Executor.$anonfun$new(Executor.scala:76)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon.run(ShutdownHookManager.scala:178)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
并且相同的错误在停止之前重复多次。
我做错了什么以及如何在 google dataproc 上使用 python 3.2.0?
可以通过以下方式实现:
- 使用包含 pyspark 3.2 作为包的环境 (
your_sample_env
) 创建数据处理集群 - 通过添加 修改
/usr/lib/spark/conf/spark-env.sh
SPARK_HOME="/opt/conda/miniconda3/envs/your_sample_env/lib/python/site-packages/pyspark"
SPARK_CONF="/usr/lib/spark/conf"
结束
- 修改
/usr/lib/spark/conf/spark-defaults.conf
注释掉以下配置
spark.yarn.jars=local:/usr/lib/spark/jars/*
spark.yarn.unmanagedAM.enabled=true
现在,您的 Spark 作业将使用 pyspark 3.2
Dataproc Serverless for Spark 刚刚发布,支持 Spark 3.2.0:https://cloud.google.com/dataproc-serverless
@milominderbinder 的回答在笔记本中对我不起作用。我使用了 google 给出的 pip install script 并在 main.
中添加了以下代码function main() {
install_pip
pip install pyspark==3.2.0
sed -i '4d;27d' /usr/lib/spark/conf/spark-defaults.conf
cat << EOF | tee -a /etc/profile.d/custom_env.sh /etc/*bashrc >/dev/null
export SPARK_HOME=/opt/conda/miniconda3/lib/python3.8/site-packages/pyspark/
export SPARK_CONF=/usr/lib/spark/conf
EOF
sed -i 's/\/usr\/lib\/spark/\/opt\/conda\/miniconda3\/lib\/python3.8\/site-packages\/pyspark\//g' /opt/conda/miniconda3/share/jupyter/kernels/python3/kernel.json
if [[ -z "${PACKAGES}" ]]; then
echo "WARNING: requirements empty"
exit 0
fi
run_with_retry pip install --upgrade ${PACKAGES}
}
这使得它在具有 Python3 内核的 jupyterlab 中工作。