dataproc 上的 Spark 流抛出 FileNotFoundException
Spark streaming on dataproc throws FileNotFoundException
当我尝试将 Spark 流作业提交到 google dataproc 集群时,出现此异常:
16/12/13 00:44:20 ERROR org.apache.spark.SparkContext: Error initializing SparkContext.
java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
...
16/12/13 00:44:20 INFO org.spark_project.jetty.server.ServerConnector: Stopped ServerConnector@d7bffbc{HTTP/1.1}{0.0.0.0:4040}
16/12/13 00:44:20 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
16/12/13 00:44:20 ERROR org.apache.spark.util.Utils: Uncaught exception in thread main
java.lang.NullPointerException
at org.apache.spark.network.shuffle.ExternalShuffleClient.close(ExternalShuffleClient.java:152)
at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1360)
...
Exception in thread "main" java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
满输出here。
这个错误似乎是在 spark-env.sh 中没有正确定义 hadoop 配置时发生的 - ,
它可以在某处配置吗?关于如何解决它的任何指示?
运行 相同的代码在本地模式下工作正常:
sparkConf.setMaster("local[4]")
对于其他上下文:作业是这样调用的:
gcloud dataproc jobs submit spark \
--cluster my-test-cluster \
--class com.company.skyfall.Skyfall \
--jars gs://my-bucket/resources/skyfall-assembly-0.0.1.jar \
--properties spark.ui.showConsoleProgress=false
这是样板设置代码:
lazy val conf = {
val c = new SparkConf().setAppName(this.getClass.getName)
c.set("spark.ui.port", (4040 + scala.util.Random.nextInt(1000)).toString)
if (isLocal) c.setMaster("local[4]")
c.set("spark.streaming.receiver.writeAheadLog.enable", "true")
c.set("spark.streaming.blockInterval", "1s")
}
lazy val ssc = if (checkPointingEnabled) {
StreamingContext.getOrCreate(getCheckPointDirectory, createStreamingContext)
} else {
createStreamingContext()
}
private def getCheckPointDirectory: String = {
if (isLocal) localCheckPointPath else checkPointPath
}
private def createStreamingContext(): StreamingContext = {
val s = new StreamingContext(conf, Seconds(batchDurationSeconds))
s.checkpoint(getCheckPointDirectory)
s
}
提前致谢
这可能不是您第一次 运行 使用给定检查点目录的作业,因为检查点目录中已经包含一个检查点?
发生这种情况是因为检查点硬编码了用于提交 YARN 应用程序的确切 jarfile 参数,并且当 运行在 Dataproc 上使用指向 GCS 的 --jars
标志时,这实际上是句法Dataproc 的 sugar 会自动将您的 jarfile 从 GCS 暂存到本地文件路径 /tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar
中,该路径仅在单个作业期间临时使用 -运行,因为 Spark 无法直接调用 jarfile不在本地暂存的 GCS。
但是,在后续作业中,先前的 tmp jarfile 将被删除,但新作业会尝试引用硬编码到检查点数据中的旧位置。
检查点数据中的硬编码还导致其他问题;例如,Dataproc 还使用 YARN "tags" 来跟踪作业,如果在新的 YARN 应用程序中重复使用旧 Dataproc 作业的 "tag",则会与 YARN 发生冲突。对于 运行 您的流应用程序,您需要先清除检查点目录(如果可能)以从头开始,然后:
- 在开始作业之前,您必须将作业 jarfile 放在主节点上的某个位置,然后您的“--jar”标志必须指定 "file:///path/on/master/node/to/jarfile.jar".
当你指定一个 "file:///" 路径时,dataproc 知道它已经在主节点上,所以它不会重新进入 /tmp 目录,所以在这种情况下,检查点指向一些是安全的修复了 master 上的本地目录。
您可以使用 init 操作来执行此操作,也可以提交快速 pig 作业(或者只是通过 ssh 进入主服务器并下载该 jar 文件):
# Use a quick pig job to download the jarfile to a local directory (for example /usr/lib/spark in this case)
gcloud dataproc jobs submit pig --cluster my-test-cluster \
--execute "fs -cp gs://my-bucket/resources/skyfall-assembly-0.0.1.jar file:///usr/lib/spark/skyfall-assembly-0.0.1.jar"
# Submit the first attempt of the job
gcloud dataproc jobs submit spark --cluster my-test-cluster \
--class com.company.skyfall.Skyfall \
--jars file:///usr/lib/spark/skyfall-assembly-0.0.1.jar \
--properties spark.ui.showConsoleProgress=false
- Dataproc 在幕后依赖 spark.yarn.tags 来跟踪与作业关联的 YARN 应用程序。但是,检查点包含一个陈旧的 spark.yarn.tags,这会导致 Dataproc 混淆似乎与旧作业相关联的新应用程序。
目前,只有 "cleans up" 个可疑的 YARN 应用程序只要最近被杀死的 jobid 保存在内存中,所以重新启动 dataproc 代理将解决这个问题。
# Kill the job through the UI or something before the next step.
# Now use "pig sh" to restart the dataproc agent
gcloud dataproc jobs submit pig --cluster my-test-cluster \
--execute "sh systemctl restart google-dataproc-agent.service"
# Re-run your job without needing to change anything else,
# it'll be fine now if you ever need to resubmit it and it
# needs to recover from the checkpoint again.
请记住,根据检查点的性质,这意味着您将无法更改在后续 运行 中传递的参数,因为检查点恢复用于破坏您的命令行设置.
您也可以运行 yarn 集群模式下的作业,以避免将jar 添加到您的master 机器上。潜在的权衡是火花驱动程序将 运行 在工作节点而不是主节点中。
当我尝试将 Spark 流作业提交到 google dataproc 集群时,出现此异常:
16/12/13 00:44:20 ERROR org.apache.spark.SparkContext: Error initializing SparkContext.
java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
...
16/12/13 00:44:20 INFO org.spark_project.jetty.server.ServerConnector: Stopped ServerConnector@d7bffbc{HTTP/1.1}{0.0.0.0:4040}
16/12/13 00:44:20 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
16/12/13 00:44:20 ERROR org.apache.spark.util.Utils: Uncaught exception in thread main
java.lang.NullPointerException
at org.apache.spark.network.shuffle.ExternalShuffleClient.close(ExternalShuffleClient.java:152)
at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1360)
...
Exception in thread "main" java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
满输出here。
这个错误似乎是在 spark-env.sh 中没有正确定义 hadoop 配置时发生的 -
它可以在某处配置吗?关于如何解决它的任何指示?
运行 相同的代码在本地模式下工作正常:
sparkConf.setMaster("local[4]")
对于其他上下文:作业是这样调用的:
gcloud dataproc jobs submit spark \
--cluster my-test-cluster \
--class com.company.skyfall.Skyfall \
--jars gs://my-bucket/resources/skyfall-assembly-0.0.1.jar \
--properties spark.ui.showConsoleProgress=false
这是样板设置代码:
lazy val conf = {
val c = new SparkConf().setAppName(this.getClass.getName)
c.set("spark.ui.port", (4040 + scala.util.Random.nextInt(1000)).toString)
if (isLocal) c.setMaster("local[4]")
c.set("spark.streaming.receiver.writeAheadLog.enable", "true")
c.set("spark.streaming.blockInterval", "1s")
}
lazy val ssc = if (checkPointingEnabled) {
StreamingContext.getOrCreate(getCheckPointDirectory, createStreamingContext)
} else {
createStreamingContext()
}
private def getCheckPointDirectory: String = {
if (isLocal) localCheckPointPath else checkPointPath
}
private def createStreamingContext(): StreamingContext = {
val s = new StreamingContext(conf, Seconds(batchDurationSeconds))
s.checkpoint(getCheckPointDirectory)
s
}
提前致谢
这可能不是您第一次 运行 使用给定检查点目录的作业,因为检查点目录中已经包含一个检查点?
发生这种情况是因为检查点硬编码了用于提交 YARN 应用程序的确切 jarfile 参数,并且当 运行在 Dataproc 上使用指向 GCS 的 --jars
标志时,这实际上是句法Dataproc 的 sugar 会自动将您的 jarfile 从 GCS 暂存到本地文件路径 /tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar
中,该路径仅在单个作业期间临时使用 -运行,因为 Spark 无法直接调用 jarfile不在本地暂存的 GCS。
但是,在后续作业中,先前的 tmp jarfile 将被删除,但新作业会尝试引用硬编码到检查点数据中的旧位置。
检查点数据中的硬编码还导致其他问题;例如,Dataproc 还使用 YARN "tags" 来跟踪作业,如果在新的 YARN 应用程序中重复使用旧 Dataproc 作业的 "tag",则会与 YARN 发生冲突。对于 运行 您的流应用程序,您需要先清除检查点目录(如果可能)以从头开始,然后:
- 在开始作业之前,您必须将作业 jarfile 放在主节点上的某个位置,然后您的“--jar”标志必须指定 "file:///path/on/master/node/to/jarfile.jar".
当你指定一个 "file:///" 路径时,dataproc 知道它已经在主节点上,所以它不会重新进入 /tmp 目录,所以在这种情况下,检查点指向一些是安全的修复了 master 上的本地目录。
您可以使用 init 操作来执行此操作,也可以提交快速 pig 作业(或者只是通过 ssh 进入主服务器并下载该 jar 文件):
# Use a quick pig job to download the jarfile to a local directory (for example /usr/lib/spark in this case)
gcloud dataproc jobs submit pig --cluster my-test-cluster \
--execute "fs -cp gs://my-bucket/resources/skyfall-assembly-0.0.1.jar file:///usr/lib/spark/skyfall-assembly-0.0.1.jar"
# Submit the first attempt of the job
gcloud dataproc jobs submit spark --cluster my-test-cluster \
--class com.company.skyfall.Skyfall \
--jars file:///usr/lib/spark/skyfall-assembly-0.0.1.jar \
--properties spark.ui.showConsoleProgress=false
- Dataproc 在幕后依赖 spark.yarn.tags 来跟踪与作业关联的 YARN 应用程序。但是,检查点包含一个陈旧的 spark.yarn.tags,这会导致 Dataproc 混淆似乎与旧作业相关联的新应用程序。
目前,只有 "cleans up" 个可疑的 YARN 应用程序只要最近被杀死的 jobid 保存在内存中,所以重新启动 dataproc 代理将解决这个问题。
# Kill the job through the UI or something before the next step.
# Now use "pig sh" to restart the dataproc agent
gcloud dataproc jobs submit pig --cluster my-test-cluster \
--execute "sh systemctl restart google-dataproc-agent.service"
# Re-run your job without needing to change anything else,
# it'll be fine now if you ever need to resubmit it and it
# needs to recover from the checkpoint again.
请记住,根据检查点的性质,这意味着您将无法更改在后续 运行 中传递的参数,因为检查点恢复用于破坏您的命令行设置.
您也可以运行 yarn 集群模式下的作业,以避免将jar 添加到您的master 机器上。潜在的权衡是火花驱动程序将 运行 在工作节点而不是主节点中。