想知道为什么空内部迭代器会导致 mapPartitionsWithIndex 出现不可序列化异常

wondering why empty inner iterator causes not serializable exception with mapPartitionsWithIndex

我一直在试验 Spark 的 mapPartitionsWithIndex,我 运行 遇到问题时 试图 return 元组的迭代器本身包含一个空迭代器。

我尝试了几种不同的构造内部迭代器的方法[通过 Iterator() 和 List(...).iterator],以及 所有的道路都让我得到这个错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0 in stage 0.0 (TID 2) had a not serializable result: scala.collection.LinearSeqLike$$anon
Serialization stack:
        - object not serializable (class: scala.collection.LinearSeqLike$$anon, value: empty iterator)
        - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
        - object (class scala.Tuple2, (1,empty iterator))
        - element of array (index: 0)
        - array (class [Lscala.Tuple2;, size 1)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1423)

我的代码示例如下。请注意,作为给定的 运行s OK(空迭代器被 returned 作为 mapPartitionsWithIndex 值。)但是当你 运行 使用现在注释掉的版本时 mapPartitionsWithIndex 调用你会得到上面的错误。

如果有人对如何使它起作用有任何建议,我将非常感激。

import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object ANonWorkingExample extends App {
  val sparkConf = new SparkConf().setAppName("continuous").setMaster("local[*]")
  val sc = new SparkContext(sparkConf)
  val parallel: RDD[Int] = sc.parallelize(1 to 9)
  val parts: Array[Partition] = parallel.partitions

  val partRDD: RDD[(Int, Iterator[Int])] =
    parallel.coalesce(3).
      mapPartitionsWithIndex {
        (partitionIndex: Int, inputiterator: Iterator[Int]) =>
          val mappedInput: Iterator[Int] = inputiterator.map(_ + 1)
          // Iterator((partitionIndex, mappedInput)) // FAILS
          Iterator()   // no exception.. but not really what i want.

      }

  val data = partRDD.collect
  println("data:" + data.toList);
}

我不确定你想要达到什么目的,与这里的一些专家相比,我是一个新手。

我展示了一些东西,可能会让您了解如何正确地做我认为正确的事情并发表一些评论:

  1. 您似乎明确地获取了分区并调用了 mapPartitions - 对我来说是第一个。
  2. mapPartitions 中的 RDD 和各种 SPARK SCALA 东西不会飞;它是关于可迭代对象的,我认为你需要降到仅 SCALA 级别。
  3. 可序列化错误来自执行 List[Int]。

这是一个显示索引分区以及相应索引值的示例。

import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
// from your stuff, left in

val parallel: RDD[Int] = sc.parallelize(1 to 9, 4)
val mapped =   parallel.mapPartitionsWithIndex{
                       (index, iterator) => {
                          println("Called in Partition -> " + index)
                          val myList = iterator.toList                          
                          myList.map(x => (index, x)).groupBy( _._1 ).mapValues( _.map( _._2 ) ).toList.iterator
                       }
                 }  
mapped.collect()

下面的returns有点像我认为你想要的东西:

res38: Array[(Int, List[Int])] = Array((0,List(1, 2)), (1,List(3, 4)), (2,List(5, 6)), (3,List(7, 8, 9)))

最后说明:文档等内容并不容易理解,您无法从字数统计示例中获得全部信息!

所以,希望这对您有所帮助。

我认为它可能会让你走上你想去的地方的正确道路,我不太明白,但也许你现在可以只见树木不见森林了。

所以,我所做的愚蠢的事情是尝试 return 一个不可序列化的数据结构:一个迭代器,正如我得到的堆栈跟踪清楚地表明的那样。

解决方案是不使用迭代器。相反,使用像 Seq 或 List 这样的集合。下面的示例程序说明了执行我尝试执行的操作的正确方法。

import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object AWorkingExample extends App {
  val sparkConf = new SparkConf().setAppName("batman").setMaster("local[*]")
  val sc = new SparkContext(sparkConf)
  val parallel: RDD[Int] = sc.parallelize(1 to 9)
  val parts: Array[Partition] = parallel.partitions

  val partRDD: RDD[(Int, List[Int])] =
    parallel.coalesce(3).
      mapPartitionsWithIndex {
        (partitionIndex: Int, inputiterator: Iterator[Int]) =>
          val mappedInput: Iterator[Int] = inputiterator.map(_ + 1)
          Iterator((partitionIndex, mappedInput.toList)) // Note the .toList() call -- that makes it work
      }

  val data = partRDD.collect
  println("data:" + data.toList);
}

顺便说一句,我最初想做的是具体查看并行化到 RDD 结构中的哪些数据块分配给了哪个分区。如果你 运行 这个程序,你会得到以下输出:

数据:列表((0,列表(2, 3)), (1,列表(4, 5, 6)), (2,列表(7, 8, 9, 10)))

有趣的是,数据分布本来可以更平衡,但事实并非如此。这不是问题的重点,但我觉得这很有趣。