如何迭代记录spark scala?

How to iterate records spark scala?

我有一个变量 "myrdd",它是一个 avro 文件,其中包含通过 hadoopfile 加载的 10 条记录。

当我做的时候

myrdd.first_1.datum.getName()

我知道名字了。问题是,我在 "myrdd" 中有 10 条记录。当我这样做时:

myrdd.map(x => {println(x._1.datum.getName())})

它不起作用并且一次打印出一个奇怪的对象。我如何遍历所有记录?

这是使用 spark-shell 进行类似场景的会话的日志。

给定

scala> persons
res8: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> persons.first
res7: org.apache.spark.sql.Row = [Justin,19]

你的问题看起来像

scala> persons.map(t => println(t))
res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]

所以 map 只是 returns 另一个 RDD(该函数不会立即应用,当您真正迭代结果时会应用该函数 "lazily")。

因此,当您具体化(使用 collect())时,您会得到一个 "normal" 集合:

scala> persons.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])

你可以map。请注意,在这种情况下,您在传递给 mapprintln)的闭包中有一个副作用,println 的结果是 Unit):

scala> persons.collect().map(t => println(t))
[Justin,19]
res5: Array[Unit] = Array(())

如果在最后应用 collect,结果相同:

scala> persons.map(t => println(t)).collect()
[Justin,19]
res19: Array[Unit] = Array(())

但是如果你只想打印行,你可以简化为使用foreach:

scala> persons.foreach(t => println(t))
[Justin,19]

正如@RohanAletty 在评论中指出的那样,这适用于本地 Spark 作业。如果作业在集群中运行,则还需要 collect

persons.collect().foreach(t => println(t))

备注

  • 可以在 Iterator class.
  • 中观察到相同的行为
  • 上面会话的输出已经重新排序

更新

关于过滤:collect的位置是"bad",如果在collect之后应用过滤器,可以在之前应用。

例如,这些表达式给出相同的结果:

scala> persons.filter("age > 20").collect().foreach(println)
[Michael,29]
[Andy,30]

scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
[Michael,29]
[Andy,30]

但第二种情况更糟,因为该过滤器可以在 collect.

之前应用

这同样适用于任何类型的聚合。