引发奇怪的行为:在地图上打印列表
Spark strange behaviour: printing list in map over group
Spark map
函数中具有奇怪 println
行为的简单案例。
部分记录首先按 id
分组。在这种情况下,所有记录都具有相同的 id
,因此我们得到一个组。然后映射到这个组,我们得到 RDD[List[Strange.R]]
然后将它转换为 List[Strange.R]
。出于某种原因迭代和打印此列表会打印其元素两次。为什么?
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Logger
import org.apache.log4j.Level
object Strange {
case class R(foo: String, bar: String, id: Int)
val recs = Array(
R("foo1", "bar1", 1),
R("foo2", "bar2", 1),
R("foo3", "bar3", 1))
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setAppName("Test")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val rs = sc.parallelize(recs)
test(rs)
}
def test(rs:org.apache.spark.rdd.RDD[R]) = {
val grp = rs.groupBy({r=>r.id}) // group records by id
println("*** Group count: "+grp.count())
val rddLst = grp.map(tuple => tuple match{
case (id, iter) => {
val x = iter.toList
println("--- List size: "+x.size)
x.foreach(println)
x
}
})
val rdd = rddLst.flatMap(identity)
println("*** RDD[List] count: "+rddLst.count+" RDD count: "+rdd.count)
}
}
输出为:
*** Group count: 1
--- List size: 3
R(foo1,bar1,1)
R(foo2,bar2,1)
R(foo3,bar3,1)
--- List size: 3
R(foo1,bar1,1)
R(foo2,bar2,1)
R(foo3,bar3,1)
*** RDD[List] count: 1 RDD count: 3
请记住,spark RDD 是 'lazily evaluated',因此每当您对 RDD 执行操作(例如计数)时,它都会执行该 RDD 上的所有转换(例如 map),即使它们已经之前被执行过。
所以当你运行
rddLst.count
这会执行地图中的代码(您正在执行 println 的地方)
当你这样做时,该代码会再次执行
rdd.count
你可以通过调用rdd.toDebugString来查看RDD是如何组成的。
如果您只想转换为 运行 一次,您可以将 .cache 放在地图的结果上,但这并不能保证它,因为缓存只是激发的优化提示。
迂腐一点,map 中的代码应该没有副作用,这就是为什么您会看到 'strange' 行为。
Spark map
函数中具有奇怪 println
行为的简单案例。
部分记录首先按 id
分组。在这种情况下,所有记录都具有相同的 id
,因此我们得到一个组。然后映射到这个组,我们得到 RDD[List[Strange.R]]
然后将它转换为 List[Strange.R]
。出于某种原因迭代和打印此列表会打印其元素两次。为什么?
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Logger
import org.apache.log4j.Level
object Strange {
case class R(foo: String, bar: String, id: Int)
val recs = Array(
R("foo1", "bar1", 1),
R("foo2", "bar2", 1),
R("foo3", "bar3", 1))
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setAppName("Test")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val rs = sc.parallelize(recs)
test(rs)
}
def test(rs:org.apache.spark.rdd.RDD[R]) = {
val grp = rs.groupBy({r=>r.id}) // group records by id
println("*** Group count: "+grp.count())
val rddLst = grp.map(tuple => tuple match{
case (id, iter) => {
val x = iter.toList
println("--- List size: "+x.size)
x.foreach(println)
x
}
})
val rdd = rddLst.flatMap(identity)
println("*** RDD[List] count: "+rddLst.count+" RDD count: "+rdd.count)
}
}
输出为:
*** Group count: 1
--- List size: 3
R(foo1,bar1,1)
R(foo2,bar2,1)
R(foo3,bar3,1)
--- List size: 3
R(foo1,bar1,1)
R(foo2,bar2,1)
R(foo3,bar3,1)
*** RDD[List] count: 1 RDD count: 3
请记住,spark RDD 是 'lazily evaluated',因此每当您对 RDD 执行操作(例如计数)时,它都会执行该 RDD 上的所有转换(例如 map),即使它们已经之前被执行过。
所以当你运行
rddLst.count
这会执行地图中的代码(您正在执行 println 的地方)
当你这样做时,该代码会再次执行
rdd.count
你可以通过调用rdd.toDebugString来查看RDD是如何组成的。
如果您只想转换为 运行 一次,您可以将 .cache 放在地图的结果上,但这并不能保证它,因为缓存只是激发的优化提示。
迂腐一点,map 中的代码应该没有副作用,这就是为什么您会看到 'strange' 行为。