引发奇怪的行为:在地图上打印列表

Spark strange behaviour: printing list in map over group

Spark map 函数中具有奇怪 println 行为的简单案例。

部分记录首先按 id 分组。在这种情况下,所有记录都具有相同的 id,因此我们得到一个组。然后映射到这个组,我们得到 RDD[List[Strange.R]] 然后将它转换为 List[Strange.R]。出于某种原因迭代和打印此列表会打印其元素两次。为什么?

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Logger
import org.apache.log4j.Level

object Strange {

  case class R(foo: String, bar: String, id: Int)
  val recs = Array(
    R("foo1", "bar1", 1),
    R("foo2", "bar2", 1),
    R("foo3", "bar3", 1))

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setAppName("Test")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)
    val rs = sc.parallelize(recs)
    test(rs)
  }

  def test(rs:org.apache.spark.rdd.RDD[R]) = {
    val grp = rs.groupBy({r=>r.id}) // group records by id
        println("*** Group count: "+grp.count())
    val rddLst = grp.map(tuple => tuple match{ 
      case (id, iter) => {
        val x = iter.toList
        println("--- List size: "+x.size)
        x.foreach(println)
        x
        } 
      })
    val rdd = rddLst.flatMap(identity)
    println("*** RDD[List] count: "+rddLst.count+" RDD count: "+rdd.count)
  }
}

输出为:

*** Group count: 1
--- List size: 3
R(foo1,bar1,1)
R(foo2,bar2,1)
R(foo3,bar3,1)
--- List size: 3
R(foo1,bar1,1)
R(foo2,bar2,1)
R(foo3,bar3,1)
*** RDD[List] count: 1 RDD count: 3

请记住,spark RDD 是 'lazily evaluated',因此每当您对 RDD 执行操作(例如计数)时,它都会执行该 RDD 上的所有转换(例如 map),即使它们已经之前被执行过。

所以当你运行

rddLst.count

这会执行地图中的代码(您正在执行 println 的地方)

当你这样做时,该代码会再次执行

rdd.count

你可以通过调用rdd.toDebugString来查看RDD是如何组成的。

如果您只想转换为 运行 一次,您可以将 .cache 放在地图的结果上,但这并不能保证它,因为缓存只是激发的优化提示。

迂腐一点,map 中的代码应该没有副作用,这就是为什么您会看到 'strange' 行为。