Foreach 中的 Spark

Spark in Foreach

val a = sc.textFile("/user/cts367689/datagen.txt")

val b = a.map(x => (x.split(",")(0),x.split(",")(2),x.split(4))))

val c = b.filter(x => (x._3.toInt > 500))

c.foreach(x => println(x))

c.foreach {x => {println(x)}}

当我使用 for-each 时,我没有得到预期的输出 statement.I 希望输出在一行中打印一个但不确定我的代码有什么问题。

我认为这个问题之前已经回答过几次了,但现在我们再来一次,来自官方 Programming Guide :

打印 RDD 的元素

一个常见的习惯用法是尝试使用 rdd.foreach(println)rdd.map(println) 打印出 RDD 的元素。在一台机器上,这将生成预期的输出并打印所有 RDD 的元素。

scala> val rdd = sc.parallelize(Seq((1,2,3),(2,3,4)))
// rdd: org.apache.spark.rdd.RDD[(Int, Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> rdd.foreach(println)
// (1,2,3)
// (2,3,4)

但是,在集群模式下,执行程序调用的标准输出的输出现在改为写入执行程序的标准输出,而不是 driver 上的标准输出,因此 driver 上的标准输出获胜展示这些!

要打印 driver 上的所有元素,需要 collect() 将数据返回到 driver 节点,因此:

scala> rdd.collect().foreach(println)
// (1,2,3)
// (2,3,4)

这是限制。但是,如果您的数据不适合 driver,这可能会导致 driver 到 运行 内存不足,因为 collect() 将整个 RDD 提取到一台机器上;从而导致你的 driver 崩溃。

如果只需要打印RDD的几个元素,更安全的方法是使用take():

scala> val rdd = sc.parallelize(Range(1, 1000000000))
// rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:27

scala> rdd.take(100).foreach(println)
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8
// 9
// 10
// [...]

PS: 关于 foreach 方法的小说明。 foreach 运行 是数据集每个元素的函数。这种方法通常用于副作用,例如更新累加器或与外部存储系统交互。

我希望这能回答您的问题。