如何修复 "MetadataFetchFailedException: Missing an output location for shuffle"?

How to fix "MetadataFetchFailedException: Missing an output location for shuffle"?

如果我增加我的 word2vec 模型的模型大小,我开始在我的 log 中出现这种异常:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 6
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses.apply(MapOutputTracker.scala:542)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses.apply(MapOutputTracker.scala:538)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach.apply(TraversableLike.scala:772)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
    at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute.apply(CoalescedRDD.scala:96)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute.apply(CoalescedRDD.scala:95)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:927)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我尝试编写自己的 "save model" 版本,如下所示:

  def save(model: Word2VecModel, sc: SparkContext, path: String): Unit = {

    println("Saving model as CSV ..")

    val vectorSize = model.getVectors.values.head.size

    println("vectorSize="+vectorSize)

    val SEPARATOR_TOKEN = " "    
    val dataArray = model.getVectors.toSeq.map { case (w, v) => Data(w, v) }

    println("Got dataArray ..")
    println("parallelize(dataArray, 10)")
    val par = sc.parallelize(dataArray, 10)
          .map(d => {

            val sb = new mutable.StringBuilder()
            sb.append(d.word)
            sb.append(SEPARATOR_TOKEN)

            for(v <- d.vector) {
              sb.append(v)
              sb.append(SEPARATOR_TOKEN)
            }
            sb.setLength(sb.length - 1)
            sb.append("\n")
            sb.toString()
          })
    println("repartition(1)")
    val rep = par.repartition(1)
    println("collect()")
    val vectorsAsString = rep.collect()

    println("Collected serialized vectors ..")    

    val cfile = new mutable.StringBuilder()

    cfile.append(vectorsAsString.length)
    cfile.append(" ")
    cfile.append(vectorSize)
    cfile.append("\n")

    val sb = new StringBuilder
    sb.append("word,")
    for(i <- 0 until vectorSize) {
      sb.append("v")
      sb.append(i.toString)
      sb.append(",")
    }
    sb.setLength(sb.length - 1)
    sb.append("\n")

    for(vectorString <- vectorsAsString) {
      sb.append(vectorString)
      cfile.append(vectorString)
    }

    println("Saving file to " + new Path(path, "data").toUri.toString)
    sc.parallelize(sb.toString().split("\n"), 1).saveAsTextFile(new Path(path+".csv", "data").toUri.toString)
    sc.parallelize(cfile.toString().split("\n"), 1).saveAsTextFile(new Path(path+".cs", "data").toUri.toString)
  }

显然它的工作方式类似于他们的 current implementation - 但事实并非如此。

我想要一个 word2vec 模型。它适用于小文件,但如果模型变大则无效。

MetadataFetchFailedException 当执行器上的 MapOutputTracker 无法在本地缓存中找到分区的请求洗牌映射输出并尝试从驱动程序的 MapOutputTracker 远程获取它们时抛出

MetadataFetchFailedException。 =14=]

这可能导致一些结论:

  1. 驱动内存问题
  2. 执行者的记忆问题
  3. 执行者丢失

请查看日志以查找报告为 "Executor lost" INFO 消息的问题 and/or 查看 Web UI 的执行器页面并查看执行器如何工作。

执行器丢失的根本原因也可能是集群管理器已决定杀死行为不端的执行器(可能使用了比请求更多的内存)。

请参阅另一个问题 以获得更多见解。

简而言之,配置可能会有所帮助:

--conf spark.blacklist.enabled=true # blacklist bad machine
--conf spark.reducer.maxReqsInFlight=10 # limit concurrent requests from reducer 
--conf spark.shuffle.io.retryWait=10s # increase retry wait
--conf spark.shuffle.io.maxRetries=10 # increase retry times
--conf spark.shuffle.io.backLog=4096 # increase tcp connection wait queue length

详细解释如下。

对于MetadataFetchFailedException,通常发生在一个执行器突然被杀死或终止时,但是这个执行器有一些随机输出,然后当另一个执行器试图获取这个随机输出的元数据时,就会发生异常。

  1. 在大多数情况下,这是由于容器被 Yarn killed for 超出内存限制。所以你需要在 日志。

  2. 最常见的修复是增加memoryOverhead,默认值 是 0.1 * 执行程序内存。这对于大多数情况来说太小了。我会 建议将其设置为 0.2 * 执行程序内存。如果你有大量 执行者或 运行 另一个 sub-process,你需要更大的价值 这个。