想知道为什么空内部迭代器会导致 mapPartitionsWithIndex 出现不可序列化异常
wondering why empty inner iterator causes not serializable exception with mapPartitionsWithIndex
我一直在试验 Spark 的 mapPartitionsWithIndex,我 运行 遇到问题时
试图 return 元组的迭代器本身包含一个空迭代器。
我尝试了几种不同的构造内部迭代器的方法[通过 Iterator() 和 List(...).iterator],以及
所有的道路都让我得到这个错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0 in stage 0.0 (TID 2) had a not serializable result: scala.collection.LinearSeqLike$$anon
Serialization stack:
- object not serializable (class: scala.collection.LinearSeqLike$$anon, value: empty iterator)
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, (1,empty iterator))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 1)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1423)
我的代码示例如下。请注意,作为给定的 运行s OK(空迭代器被 returned 作为
mapPartitionsWithIndex 值。)但是当你 运行 使用现在注释掉的版本时
mapPartitionsWithIndex 调用你会得到上面的错误。
如果有人对如何使它起作用有任何建议,我将非常感激。
import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object ANonWorkingExample extends App {
val sparkConf = new SparkConf().setAppName("continuous").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val parallel: RDD[Int] = sc.parallelize(1 to 9)
val parts: Array[Partition] = parallel.partitions
val partRDD: RDD[(Int, Iterator[Int])] =
parallel.coalesce(3).
mapPartitionsWithIndex {
(partitionIndex: Int, inputiterator: Iterator[Int]) =>
val mappedInput: Iterator[Int] = inputiterator.map(_ + 1)
// Iterator((partitionIndex, mappedInput)) // FAILS
Iterator() // no exception.. but not really what i want.
}
val data = partRDD.collect
println("data:" + data.toList);
}
我不确定你想要达到什么目的,与这里的一些专家相比,我是一个新手。
我展示了一些东西,可能会让您了解如何正确地做我认为正确的事情并发表一些评论:
- 您似乎明确地获取了分区并调用了 mapPartitions - 对我来说是第一个。
- mapPartitions 中的 RDD 和各种 SPARK SCALA 东西不会飞;它是关于可迭代对象的,我认为你需要降到仅 SCALA 级别。
- 可序列化错误来自执行 List[Int]。
这是一个显示索引分区以及相应索引值的示例。
import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
// from your stuff, left in
val parallel: RDD[Int] = sc.parallelize(1 to 9, 4)
val mapped = parallel.mapPartitionsWithIndex{
(index, iterator) => {
println("Called in Partition -> " + index)
val myList = iterator.toList
myList.map(x => (index, x)).groupBy( _._1 ).mapValues( _.map( _._2 ) ).toList.iterator
}
}
mapped.collect()
下面的returns有点像我认为你想要的东西:
res38: Array[(Int, List[Int])] = Array((0,List(1, 2)), (1,List(3, 4)), (2,List(5, 6)), (3,List(7, 8, 9)))
最后说明:文档等内容并不容易理解,您无法从字数统计示例中获得全部信息!
所以,希望这对您有所帮助。
我认为它可能会让你走上你想去的地方的正确道路,我不太明白,但也许你现在可以只见树木不见森林了。
所以,我所做的愚蠢的事情是尝试 return 一个不可序列化的数据结构:一个迭代器,正如我得到的堆栈跟踪清楚地表明的那样。
解决方案是不使用迭代器。相反,使用像 Seq 或 List 这样的集合。下面的示例程序说明了执行我尝试执行的操作的正确方法。
import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object AWorkingExample extends App {
val sparkConf = new SparkConf().setAppName("batman").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val parallel: RDD[Int] = sc.parallelize(1 to 9)
val parts: Array[Partition] = parallel.partitions
val partRDD: RDD[(Int, List[Int])] =
parallel.coalesce(3).
mapPartitionsWithIndex {
(partitionIndex: Int, inputiterator: Iterator[Int]) =>
val mappedInput: Iterator[Int] = inputiterator.map(_ + 1)
Iterator((partitionIndex, mappedInput.toList)) // Note the .toList() call -- that makes it work
}
val data = partRDD.collect
println("data:" + data.toList);
}
顺便说一句,我最初想做的是具体查看并行化到 RDD 结构中的哪些数据块分配给了哪个分区。如果你 运行 这个程序,你会得到以下输出:
数据:列表((0,列表(2, 3)), (1,列表(4, 5, 6)), (2,列表(7, 8, 9, 10)))
有趣的是,数据分布本来可以更平衡,但事实并非如此。这不是问题的重点,但我觉得这很有趣。
我一直在试验 Spark 的 mapPartitionsWithIndex,我 运行 遇到问题时 试图 return 元组的迭代器本身包含一个空迭代器。
我尝试了几种不同的构造内部迭代器的方法[通过 Iterator() 和 List(...).iterator],以及 所有的道路都让我得到这个错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0 in stage 0.0 (TID 2) had a not serializable result: scala.collection.LinearSeqLike$$anon
Serialization stack:
- object not serializable (class: scala.collection.LinearSeqLike$$anon, value: empty iterator)
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, (1,empty iterator))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 1)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1423)
我的代码示例如下。请注意,作为给定的 运行s OK(空迭代器被 returned 作为 mapPartitionsWithIndex 值。)但是当你 运行 使用现在注释掉的版本时 mapPartitionsWithIndex 调用你会得到上面的错误。
如果有人对如何使它起作用有任何建议,我将非常感激。
import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object ANonWorkingExample extends App {
val sparkConf = new SparkConf().setAppName("continuous").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val parallel: RDD[Int] = sc.parallelize(1 to 9)
val parts: Array[Partition] = parallel.partitions
val partRDD: RDD[(Int, Iterator[Int])] =
parallel.coalesce(3).
mapPartitionsWithIndex {
(partitionIndex: Int, inputiterator: Iterator[Int]) =>
val mappedInput: Iterator[Int] = inputiterator.map(_ + 1)
// Iterator((partitionIndex, mappedInput)) // FAILS
Iterator() // no exception.. but not really what i want.
}
val data = partRDD.collect
println("data:" + data.toList);
}
我不确定你想要达到什么目的,与这里的一些专家相比,我是一个新手。
我展示了一些东西,可能会让您了解如何正确地做我认为正确的事情并发表一些评论:
- 您似乎明确地获取了分区并调用了 mapPartitions - 对我来说是第一个。
- mapPartitions 中的 RDD 和各种 SPARK SCALA 东西不会飞;它是关于可迭代对象的,我认为你需要降到仅 SCALA 级别。
- 可序列化错误来自执行 List[Int]。
这是一个显示索引分区以及相应索引值的示例。
import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
// from your stuff, left in
val parallel: RDD[Int] = sc.parallelize(1 to 9, 4)
val mapped = parallel.mapPartitionsWithIndex{
(index, iterator) => {
println("Called in Partition -> " + index)
val myList = iterator.toList
myList.map(x => (index, x)).groupBy( _._1 ).mapValues( _.map( _._2 ) ).toList.iterator
}
}
mapped.collect()
下面的returns有点像我认为你想要的东西:
res38: Array[(Int, List[Int])] = Array((0,List(1, 2)), (1,List(3, 4)), (2,List(5, 6)), (3,List(7, 8, 9)))
最后说明:文档等内容并不容易理解,您无法从字数统计示例中获得全部信息!
所以,希望这对您有所帮助。
我认为它可能会让你走上你想去的地方的正确道路,我不太明白,但也许你现在可以只见树木不见森林了。
所以,我所做的愚蠢的事情是尝试 return 一个不可序列化的数据结构:一个迭代器,正如我得到的堆栈跟踪清楚地表明的那样。
解决方案是不使用迭代器。相反,使用像 Seq 或 List 这样的集合。下面的示例程序说明了执行我尝试执行的操作的正确方法。
import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object AWorkingExample extends App {
val sparkConf = new SparkConf().setAppName("batman").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val parallel: RDD[Int] = sc.parallelize(1 to 9)
val parts: Array[Partition] = parallel.partitions
val partRDD: RDD[(Int, List[Int])] =
parallel.coalesce(3).
mapPartitionsWithIndex {
(partitionIndex: Int, inputiterator: Iterator[Int]) =>
val mappedInput: Iterator[Int] = inputiterator.map(_ + 1)
Iterator((partitionIndex, mappedInput.toList)) // Note the .toList() call -- that makes it work
}
val data = partRDD.collect
println("data:" + data.toList);
}
顺便说一句,我最初想做的是具体查看并行化到 RDD 结构中的哪些数据块分配给了哪个分区。如果你 运行 这个程序,你会得到以下输出:
数据:列表((0,列表(2, 3)), (1,列表(4, 5, 6)), (2,列表(7, 8, 9, 10)))
有趣的是,数据分布本来可以更平衡,但事实并非如此。这不是问题的重点,但我觉得这很有趣。