pySpark 容错配置

pySpark fault tolerant configuration

我在 yarn 模式的大型 Spark 集群上 运行 长时间工作(+ 3 小时)。 虚拟机工作人员 运行 spark 托管在 Google Cloud Dataproc 上,其中大部分可以在执行期间销毁(成本更低的抢占式虚拟机)。

发生这种情况时,作业会失败,因为任务在已销毁的工作程序上失败,并在失败工作程序的容器日志中显示此错误:

Executor is not registered

我试过将 spark.task.maxFailures 设置为 1000,但这似乎不是很有效:即使作业完成,任务似乎也不会自动重新分配,并且计算分配给这个特定工作人员的任务似乎回滚到初始阶段。

有没有一种方法可以让配置更容错,简单地排除无响应的执行程序并重新分配它们的任务?

如果需要,我可以包括 ressourcemanager 日志、nodemanager 和容器日志,但我认为它不相关。

这似乎是可抢占工人离开集群的方式的倒退。

问题不仅仅是不能容忍失败。在集群的整个生命周期中,可抢占式 worker 不断地被创建和销毁。每次 worker 离开时,YARN 都会等待 15m 的心跳,然后再检测到故障并重新创建容器。这可以使您的工作 运行 大大延长。

我们将在下一个版本中修复此问题。

解决方法:

以下将强制工作人员在关闭时离开集群。

创建以下脚本并将其上传到 GCS 存储桶:

#!/bin/sh
sudo sed -i "s/.*Stopping google-dataproc-agent.*/start-stop-daemon --stop --signal USR2 --quiet --oknodo --pidfile ${AGENT_PID_FILE}; sleep 5s/" \
   /etc/init.d/google-dataproc-agent

假设您将其上传到 gs://my-bucket/fix.sh

现在使用此初始化操作重新创建集群:

gcloud beta dataproc clusters create my-cluster ... \
  --initialization-actions gs://my-bucket/fix.sh

您可以通过 ssh 连接到主节点并在 yarn 节点列表上设置监视来验证这一点:

gcloud compute ssh my-cluster-m
watch yarn node -list

在另一个终端中,发出集群更新命令以减少工作人员数量并验证 yarn 节点数量是否相应更改。