如何迭代记录spark scala?
How to iterate records spark scala?
我有一个变量 "myrdd",它是一个 avro 文件,其中包含通过 hadoopfile 加载的 10 条记录。
当我做的时候
myrdd.first_1.datum.getName()
我知道名字了。问题是,我在 "myrdd" 中有 10 条记录。当我这样做时:
myrdd.map(x => {println(x._1.datum.getName())})
它不起作用并且一次打印出一个奇怪的对象。我如何遍历所有记录?
这是使用 spark-shell
进行类似场景的会话的日志。
给定
scala> persons
res8: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> persons.first
res7: org.apache.spark.sql.Row = [Justin,19]
你的问题看起来像
scala> persons.map(t => println(t))
res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]
所以 map
只是 returns 另一个 RDD(该函数不会立即应用,当您真正迭代结果时会应用该函数 "lazily")。
因此,当您具体化(使用 collect()
)时,您会得到一个 "normal" 集合:
scala> persons.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])
你可以map
。请注意,在这种情况下,您在传递给 map
(println
)的闭包中有一个副作用,println
的结果是 Unit
):
scala> persons.collect().map(t => println(t))
[Justin,19]
res5: Array[Unit] = Array(())
如果在最后应用 collect
,结果相同:
scala> persons.map(t => println(t)).collect()
[Justin,19]
res19: Array[Unit] = Array(())
但是如果你只想打印行,你可以简化为使用foreach
:
scala> persons.foreach(t => println(t))
[Justin,19]
正如@RohanAletty 在评论中指出的那样,这适用于本地 Spark 作业。如果作业在集群中运行,则还需要 collect
:
persons.collect().foreach(t => println(t))
备注
- 可以在
Iterator
class. 中观察到相同的行为
- 上面会话的输出已经重新排序
更新
关于过滤:collect
的位置是"bad",如果在collect
之后应用过滤器,可以在之前应用。
例如,这些表达式给出相同的结果:
scala> persons.filter("age > 20").collect().foreach(println)
[Michael,29]
[Andy,30]
scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
[Michael,29]
[Andy,30]
但第二种情况更糟,因为该过滤器可以在 collect
.
之前应用
这同样适用于任何类型的聚合。
我有一个变量 "myrdd",它是一个 avro 文件,其中包含通过 hadoopfile 加载的 10 条记录。
当我做的时候
myrdd.first_1.datum.getName()
我知道名字了。问题是,我在 "myrdd" 中有 10 条记录。当我这样做时:
myrdd.map(x => {println(x._1.datum.getName())})
它不起作用并且一次打印出一个奇怪的对象。我如何遍历所有记录?
这是使用 spark-shell
进行类似场景的会话的日志。
给定
scala> persons
res8: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> persons.first
res7: org.apache.spark.sql.Row = [Justin,19]
你的问题看起来像
scala> persons.map(t => println(t))
res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]
所以 map
只是 returns 另一个 RDD(该函数不会立即应用,当您真正迭代结果时会应用该函数 "lazily")。
因此,当您具体化(使用 collect()
)时,您会得到一个 "normal" 集合:
scala> persons.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])
你可以map
。请注意,在这种情况下,您在传递给 map
(println
)的闭包中有一个副作用,println
的结果是 Unit
):
scala> persons.collect().map(t => println(t))
[Justin,19]
res5: Array[Unit] = Array(())
如果在最后应用 collect
,结果相同:
scala> persons.map(t => println(t)).collect()
[Justin,19]
res19: Array[Unit] = Array(())
但是如果你只想打印行,你可以简化为使用foreach
:
scala> persons.foreach(t => println(t))
[Justin,19]
正如@RohanAletty 在评论中指出的那样,这适用于本地 Spark 作业。如果作业在集群中运行,则还需要 collect
:
persons.collect().foreach(t => println(t))
备注
- 可以在
Iterator
class. 中观察到相同的行为
- 上面会话的输出已经重新排序
更新
关于过滤:collect
的位置是"bad",如果在collect
之后应用过滤器,可以在之前应用。
例如,这些表达式给出相同的结果:
scala> persons.filter("age > 20").collect().foreach(println)
[Michael,29]
[Andy,30]
scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
[Michael,29]
[Andy,30]
但第二种情况更糟,因为该过滤器可以在 collect
.
这同样适用于任何类型的聚合。