spark 分区打破了懒惰的评估链并触发了我无法捕获的错误
spark partitioning breaks the lazy evaluation chain and triggers error which I cannot catch
重新分区时,spark 破坏了惰性求值链并触发了我无法 control/catch 的错误。
//simulation of reading a stream from s3
def readFromS3(partition: Int) : Iterator[(Int, String)] = {
Iterator.tabulate(3){idx =>
// simulate an error only on partition 3 record 2
(idx, if(partition == 3 && idx == 2) throw new RuntimeException("error") else s"elem $idx on partition $partition" )
}
}
val rdd = sc.parallelize(Seq(1,2,3,4))
.mapPartitionsWithIndex((partitionIndex, iter) => readFromS3(partitionIndex))
// I can do whatever I want here
//this is what triggers the evaluation of the iterator
val partitionedRdd = rdd.partitionBy(new HashPartitioner(2))
// I can do whatever I want here
//desperately trying to catch the exception
partitionedRdd.foreachPartition{ iter =>
try{
iter.foreach(println)
}catch{
case _ => println("error caught")
}
}
发表评论前请注意:
- 这是对我的真实世界应用程序的过度简化
- 我知道可以用不同的方式从 s3 读取,我应该使用 sc.textFile。我无法控制它,我无法改变它。
- 我明白问题出在哪里:分区时,spark 打破了惰性链评估并触发了错误。我必须这样做!
- 我不是说 spark 有错误,spark 需要评估记录以进行改组
- 我只能为所欲为:
- 在从 s3 读取和分区之间
- 分区后
- 我可以编写自己的自定义分区程序
鉴于上述限制,我可以解决这个问题吗?有解决办法吗?
我能找到的唯一解决方案是使用 EvaluateAheadIterator(在调用 iterator.next 之前评估缓冲区头部的方法)
import scala.collection.AbstractIterator
import scala.util.control.NonFatal
class EvalAheadIterator[+A](iter : Iterator[A]) extends AbstractIterator[A] {
private val bufferedIter : BufferedIterator[A] = iter.buffered
override def hasNext: Boolean =
if(bufferedIter.hasNext){
try{
bufferedIter.head //evaluate the head and trigger potential exceptions
true
}catch{
case NonFatal(e) =>
println("caught exception ahead of time")
false
}
}else{
false
}
override def next() : A = bufferedIter.next()
}
现在我们应该在 mapPartition 中应用 EvalAheadIterator:
//simulation of reading a stream from s3
def readFromS3(partition: Int) : Iterator[(Int, String)] = {
Iterator.tabulate(3){idx =>
// simulate an error only on partition 3 record 2
(idx, if(partition == 3 && idx == 2) throw new RuntimeException("error") else s"elem $idx on partition $partition" )
}
}
val rdd = sc.parallelize(Seq(1,2,3,4))
.mapPartitionsWithIndex((partitionIndex, iter) => readFromS3(partitionIndex))
.mapPartitions{iter => new EvalAheadIterator(iter)}
// I can do whatever I want here
//this is what triggers the evaluation of the iterator
val partitionedRdd = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
// I can do whatever I want here
//desperately trying to catch the exception
partitionedRdd.foreachPartition{ iter =>
try{
iter.foreach(println)
}catch{
case _ => println("error caught")
}
}
重新分区时,spark 破坏了惰性求值链并触发了我无法 control/catch 的错误。
//simulation of reading a stream from s3
def readFromS3(partition: Int) : Iterator[(Int, String)] = {
Iterator.tabulate(3){idx =>
// simulate an error only on partition 3 record 2
(idx, if(partition == 3 && idx == 2) throw new RuntimeException("error") else s"elem $idx on partition $partition" )
}
}
val rdd = sc.parallelize(Seq(1,2,3,4))
.mapPartitionsWithIndex((partitionIndex, iter) => readFromS3(partitionIndex))
// I can do whatever I want here
//this is what triggers the evaluation of the iterator
val partitionedRdd = rdd.partitionBy(new HashPartitioner(2))
// I can do whatever I want here
//desperately trying to catch the exception
partitionedRdd.foreachPartition{ iter =>
try{
iter.foreach(println)
}catch{
case _ => println("error caught")
}
}
发表评论前请注意:
- 这是对我的真实世界应用程序的过度简化
- 我知道可以用不同的方式从 s3 读取,我应该使用 sc.textFile。我无法控制它,我无法改变它。
- 我明白问题出在哪里:分区时,spark 打破了惰性链评估并触发了错误。我必须这样做!
- 我不是说 spark 有错误,spark 需要评估记录以进行改组
- 我只能为所欲为:
- 在从 s3 读取和分区之间
- 分区后
- 我可以编写自己的自定义分区程序
鉴于上述限制,我可以解决这个问题吗?有解决办法吗?
我能找到的唯一解决方案是使用 EvaluateAheadIterator(在调用 iterator.next 之前评估缓冲区头部的方法)
import scala.collection.AbstractIterator
import scala.util.control.NonFatal
class EvalAheadIterator[+A](iter : Iterator[A]) extends AbstractIterator[A] {
private val bufferedIter : BufferedIterator[A] = iter.buffered
override def hasNext: Boolean =
if(bufferedIter.hasNext){
try{
bufferedIter.head //evaluate the head and trigger potential exceptions
true
}catch{
case NonFatal(e) =>
println("caught exception ahead of time")
false
}
}else{
false
}
override def next() : A = bufferedIter.next()
}
现在我们应该在 mapPartition 中应用 EvalAheadIterator:
//simulation of reading a stream from s3
def readFromS3(partition: Int) : Iterator[(Int, String)] = {
Iterator.tabulate(3){idx =>
// simulate an error only on partition 3 record 2
(idx, if(partition == 3 && idx == 2) throw new RuntimeException("error") else s"elem $idx on partition $partition" )
}
}
val rdd = sc.parallelize(Seq(1,2,3,4))
.mapPartitionsWithIndex((partitionIndex, iter) => readFromS3(partitionIndex))
.mapPartitions{iter => new EvalAheadIterator(iter)}
// I can do whatever I want here
//this is what triggers the evaluation of the iterator
val partitionedRdd = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
// I can do whatever I want here
//desperately trying to catch the exception
partitionedRdd.foreachPartition{ iter =>
try{
iter.foreach(println)
}catch{
case _ => println("error caught")
}
}