Foreach 中的 Spark
Spark in Foreach
val a = sc.textFile("/user/cts367689/datagen.txt")
val b = a.map(x => (x.split(",")(0),x.split(",")(2),x.split(4))))
val c = b.filter(x => (x._3.toInt > 500))
c.foreach(x => println(x))
或
c.foreach {x => {println(x)}}
当我使用 for-each 时,我没有得到预期的输出 statement.I 希望输出在一行中打印一个但不确定我的代码有什么问题。
我认为这个问题之前已经回答过几次了,但现在我们再来一次,来自官方 Programming Guide :
打印 RDD 的元素
一个常见的习惯用法是尝试使用 rdd.foreach(println)
或 rdd.map(println)
打印出 RDD 的元素。在一台机器上,这将生成预期的输出并打印所有 RDD 的元素。
scala> val rdd = sc.parallelize(Seq((1,2,3),(2,3,4)))
// rdd: org.apache.spark.rdd.RDD[(Int, Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> rdd.foreach(println)
// (1,2,3)
// (2,3,4)
但是,在集群模式下,执行程序调用的标准输出的输出现在改为写入执行程序的标准输出,而不是 driver 上的标准输出,因此 driver 上的标准输出获胜展示这些!
要打印 driver 上的所有元素,需要 collect()
将数据返回到 driver 节点,因此:
scala> rdd.collect().foreach(println)
// (1,2,3)
// (2,3,4)
这是限制。但是,如果您的数据不适合 driver,这可能会导致 driver 到 运行 内存不足,因为 collect() 将整个 RDD 提取到一台机器上;从而导致你的 driver 崩溃。
如果只需要打印RDD的几个元素,更安全的方法是使用take():
scala> val rdd = sc.parallelize(Range(1, 1000000000))
// rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:27
scala> rdd.take(100).foreach(println)
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8
// 9
// 10
// [...]
PS: 关于 foreach
方法的小说明。 foreach
运行 是数据集每个元素的函数。这种方法通常用于副作用,例如更新累加器或与外部存储系统交互。
我希望这能回答您的问题。
val a = sc.textFile("/user/cts367689/datagen.txt")
val b = a.map(x => (x.split(",")(0),x.split(",")(2),x.split(4))))
val c = b.filter(x => (x._3.toInt > 500))
c.foreach(x => println(x))
或
c.foreach {x => {println(x)}}
当我使用 for-each 时,我没有得到预期的输出 statement.I 希望输出在一行中打印一个但不确定我的代码有什么问题。
我认为这个问题之前已经回答过几次了,但现在我们再来一次,来自官方 Programming Guide :
打印 RDD 的元素
一个常见的习惯用法是尝试使用 rdd.foreach(println)
或 rdd.map(println)
打印出 RDD 的元素。在一台机器上,这将生成预期的输出并打印所有 RDD 的元素。
scala> val rdd = sc.parallelize(Seq((1,2,3),(2,3,4)))
// rdd: org.apache.spark.rdd.RDD[(Int, Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> rdd.foreach(println)
// (1,2,3)
// (2,3,4)
但是,在集群模式下,执行程序调用的标准输出的输出现在改为写入执行程序的标准输出,而不是 driver 上的标准输出,因此 driver 上的标准输出获胜展示这些!
要打印 driver 上的所有元素,需要 collect()
将数据返回到 driver 节点,因此:
scala> rdd.collect().foreach(println)
// (1,2,3)
// (2,3,4)
这是限制。但是,如果您的数据不适合 driver,这可能会导致 driver 到 运行 内存不足,因为 collect() 将整个 RDD 提取到一台机器上;从而导致你的 driver 崩溃。
如果只需要打印RDD的几个元素,更安全的方法是使用take():
scala> val rdd = sc.parallelize(Range(1, 1000000000))
// rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:27
scala> rdd.take(100).foreach(println)
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8
// 9
// 10
// [...]
PS: 关于 foreach
方法的小说明。 foreach
运行 是数据集每个元素的函数。这种方法通常用于副作用,例如更新累加器或与外部存储系统交互。
我希望这能回答您的问题。