spark 分区打破了懒惰的评估链并触发了我无法捕获的错误

spark partitioning breaks the lazy evaluation chain and triggers error which I cannot catch

重新分区时,spark 破坏了惰性求值链并触发了我无法 control/catch 的错误。

//simulation of reading a stream from s3
def readFromS3(partition: Int) : Iterator[(Int, String)] = {
    Iterator.tabulate(3){idx => 
        // simulate an error only on partition 3 record 2
        (idx, if(partition == 3 && idx == 2) throw new RuntimeException("error") else s"elem $idx on partition $partition" )
    }    
}

val rdd = sc.parallelize(Seq(1,2,3,4))
            .mapPartitionsWithIndex((partitionIndex, iter) => readFromS3(partitionIndex))

// I can do whatever I want here

//this is what triggers the evaluation of the iterator 
val partitionedRdd = rdd.partitionBy(new HashPartitioner(2))

// I can do whatever I want here

//desperately trying to catch the exception 
partitionedRdd.foreachPartition{ iter => 
    try{
        iter.foreach(println)
    }catch{
        case _ => println("error caught")
    }
}

发表评论前请注意:

  1. 这是对我的真实世界应用程序的过度简化
  2. 我知道可以用不同的方式从 s3 读取,我应该使用 sc.textFile。我无法控制它,我无法改变它。
  3. 我明白问题出在哪里:分区时,spark 打破了惰性链评估并触发了错误。我必须这样做!
  4. 我不是说 spark 有错误,spark 需要评估记录以进行改组
  5. 我只能为所欲为:
    • 在从 s3 读取和分区之间
    • 分区后
  6. 我可以编写自己的自定义分区程序

鉴于上述限制,我可以解决这个问题吗?有解决办法吗?

我能找到的唯一解决方案是使用 EvaluateAheadIterator(在调用 iterator.next 之前评估缓冲区头部的方法)

import scala.collection.AbstractIterator
import scala.util.control.NonFatal

class EvalAheadIterator[+A](iter : Iterator[A]) extends AbstractIterator[A] {

  private val bufferedIter  : BufferedIterator[A] = iter.buffered

  override def hasNext: Boolean =
    if(bufferedIter.hasNext){
      try{
          bufferedIter.head //evaluate the head and trigger potential exceptions
          true
      }catch{
          case NonFatal(e) =>
            println("caught exception ahead of time")
            false
      }
    }else{
        false
    }


  override def next() : A = bufferedIter.next()
}

现在我们应该在 mapPartition 中应用 EvalAheadIterator:

//simulation of reading a stream from s3
def readFromS3(partition: Int) : Iterator[(Int, String)] = {
    Iterator.tabulate(3){idx => 
        // simulate an error only on partition 3 record 2
        (idx, if(partition == 3 && idx == 2) throw new RuntimeException("error") else s"elem $idx on partition $partition" )
    }    
}

val rdd = sc.parallelize(Seq(1,2,3,4))
            .mapPartitionsWithIndex((partitionIndex, iter) => readFromS3(partitionIndex))
            .mapPartitions{iter => new EvalAheadIterator(iter)}

// I can do whatever I want here

//this is what triggers the evaluation of the iterator 
val partitionedRdd = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))

// I can do whatever I want here

//desperately trying to catch the exception 
partitionedRdd.foreachPartition{ iter => 
    try{
        iter.foreach(println)
    }catch{
        case _ => println("error caught")
    }
}