[随机出现][Spark ML ALS][AWS EMR] FileNotFoundException in checkpoint folder but file exists

[Randomly appear][Spark ML ALS][AWS EMR] FileNotFoundException in checkpoint folder but file exists

我正在 运行在 AWS EMR 上计划(每天一次)spark 应用程序,该应用程序是基于 spark.ml.recommendation.ALS 的推荐算法,数据位于 AWS S3 上,该应用程序向一组用户输出推荐。

为了保证迭代算法运行稳健,我使用了spark的checkpoint函数。我在 AWS S3 上设置了检查点文件夹。

有时一切正常。但有时,spark 应用程序无法在检查点文件夹中找到该文件,即使该文件确实存在。不知道为什么。

这是一个典型的错误日志:

19/10/30 13:46:01 WARN TaskSetManager: Lost task 5.0 in stage 873.0 (TID 12169, ip-10-79-9-182.us-west-2.compute.internal, executor 5): java.io.FileNotFoundException: No such file or directory: s3a://bucket-name/checkpoint/8f63442c-dd06-45d8-8e3a-ec30634b1a2f/rdd-2166/part-00005 at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642) at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:521) at org.apache.spark.rdd.ReliableCheckpointRDD$.readCheckpointFile(ReliableCheckpointRDD.scala:292) at org.apache.spark.rdd.ReliableCheckpointRDD.compute(ReliableCheckpointRDD.scala:100) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

我检查过 s3a://bucket-name/checkpoint/8f63442c-dd06-45d8-8e3a-ec30634b1a2f/rdd-2166/part-00005 确实存在于 S3 存储中。

我的详细步骤如下:

  1. 在 s3 上创建一个检查点文件夹;
  2. 将spark的CheckpointDir设置为刚刚创建的文件夹;
  3. 运行算法;
  4. 删除检查点文件夹进行清理。

这是我的 Scala 代码:

//step 1
val pathString = "s3a://bucket-name/checkpoint"
val path = new Path(pathString)
val fileSystem = FileSystem.get(path.toUri, sparkContext.hadoopConfiguration)
fileSystem.mkdirs(path)

//step 2
sparkContext.setCheckpointDir(pathString)

//step 3
//... lots of data that not so relevant
val als = new ALS()
      .setRank(10)
      .setMaxIter(20)
      .setUserCol("userId")
      .setItemCol("clubId")
      .setRatingCol("rating")
      .setCheckpointInterval(10)
      .setColdStartStrategy("drop")
      .setPredictionCol("prediction")
//... another lots of data that not so relevant

//step 4
fileSystem.delete(path, recursive = true)

S3 最终是一致的 - 如果客户端在创建文件之前执行 HEAD,有时 404s 可以缓存在负载平衡器中 - 然后在随后的 HEAD/GET 请求中(a)返回 404(b)缓存条目已刷新,因此它停留在

附近

S3A 连接器最近进行了大量工作以尝试消除此问题 HADOOP-16490 及相关),但尚未发货。虽然它在消除 s3a 连接器中的问题方面做了很多工作,但它仍然可能容易受到 spark 使用代码的怪癖的影响。有人应该检查检查点以确保它创建的文件具有 overwrite=true。

同时:如果您使用 hadoop 3.2.x 二进制文件并使用 S3Guard 来获得一致的列表,它应该知道足以在此处重试 - 您可能只需要将重试间隔调整得更大一些以便 URL 保持足够长的时间以清除缓存。

否则,请尝试在创建文件和尝试重命名或打开之间的工作流程中添加 30-60 秒的睡眠时间,看看是否有帮助。

问题已解决。 我将检查点文件夹的位置从使用 s3 更改为 hdfs,现在程序每次都能成功运行。 s3 是最终一致的,所以它不适合检查点文件夹。

来自

val pathString = "s3a://bucket-name/checkpoint"

val pathString = "hdfs:///checkpoint"