迭代 Spark 数据框中的行和列
Iterate rows and columns in Spark dataframe
我有以下动态创建的 Spark 数据框:
val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)
val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)
val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)
val data = Seq(row1,row2,row3)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
现在,我需要遍历 sqlDF
中的每一行和每一列来打印每一列,这是我的尝试:
sqlDF.foreach { row =>
row.foreach { col => println(col) }
}
row
是类型 Row
,但不可迭代,这就是此代码在 row.foreach
中引发编译错误的原因。如何迭代Row
中的每一列?
您可以使用 toSeq
将 Row
转换为 Seq
。一旦转向 Seq
,您可以像往常一样使用 foreach
、map
或任何您需要的
对其进行迭代
sqlDF.foreach { row =>
row.toSeq.foreach{col => println(col) }
}
输出:
Berta
bbb
30
Joe
Andy
aaa
20
ccc
40
您应该在 Row
:
上使用 mkString
sqlDF.foreach { row =>
println(row.mkString(","))
}
但请注意,这将打印在执行程序 JVM 中,因此通常您不会看到输出(除非您使用 master = local)
假设您有一个 Dataframe
如下所示
+-----+------+---+
| name|sector|age|
+-----+------+---+
| Andy| aaa| 20|
|Berta| bbb| 30|
| Joe| ccc| 40|
+-----+------+---+
要循环 Dataframe 并从 Dataframe 中提取元素,您可以选择以下方法之一。
方法 1 - 使用 foreach 循环
无法直接使用 foreach
循环来循环数据帧。为此,首先您必须使用 case class
定义数据框的架构,然后必须将此架构指定给数据框。
import spark.implicits._
import org.apache.spark.sql._
case class cls_Employee(name:String, sector:String, age:Int)
val df = Seq(cls_Employee("Andy","aaa", 20), cls_Employee("Berta","bbb", 30), cls_Employee("Joe","ccc", 40)).toDF()
df.as[cls_Employee].take(df.count.toInt).foreach(t => println(s"name=${t.name},sector=${t.sector},age=${t.age}"))
请看下面的结果:
方法 2 - 使用 rdd 循环
在 Dataframe 之上使用 rdd.collect
。 row
变量将包含 rdd
行类型的 Dataframe 的每一行。要从一行中获取每个元素,请使用 row.mkString(",")
,它将以逗号分隔值包含每行的值。使用 split
函数(内置函数),您可以使用索引访问 rdd
行的每个列值。
for (row <- df.rdd.collect)
{
var name = row.mkString(",").split(",")(0)
var sector = row.mkString(",").split(",")(1)
var age = row.mkString(",").split(",")(2)
}
请注意,这种方法有两个缺点。
1、如果列值中有,
,数据会被错误的拆分到相邻的列中。
2. rdd.collect
是一个 action
returns 所有数据到驱动程序的内存,驱动程序的内存可能没有那么大,无法保存数据,最终导致应用程序失败。
我建议使用方法 1。
方法 3 - 使用 where 和 select
你可以直接使用where
和select
,它们会在内部循环查找数据。因为它不应该抛出 Index out of bound 异常,所以使用了 if 条件
if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)
name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString
方法 4 - 使用临时表
您可以将dataframe注册为temptable,它将存储在spark的内存中。然后你可以像其他数据库一样使用select查询来查询数据,然后收集并保存在变量
中
df.registerTempTable("student")
name = sqlContext.sql("select name from student where name='Andy'").collect()(0).toString().replace("[","").replace("]","")
sqlDF.foreach
对我不起作用,但@Sarath Avanavu 回答的方法 1 有效,但有时它也在播放记录的顺序。
我找到了另一种有效的方法
df.collect().foreach { row =>
println(row.mkString(","))
}
简单的收集结果然后申请每个
df.collect().foreach(println)
这对我来说效果很好
sqlDF.collect().foreach(row => row.toSeq.foreach(col => println(col)))
您应该遍历允许数据由 Spark 并行处理的分区,并且您可以在分区内的每一行上执行 foreach。
如果需要,您可以将分区中的数据进一步分组
sqlDF.foreachPartition { partitionedRows: Iterator[Model1] =>
if (partitionedRows.take(1).nonEmpty) {
partitionedRows.grouped(numberOfRowsPerBatch).foreach { batch =>
batch.foreach { row =>
.....
让我们假设 resultDF 是 Dataframe。
val resultDF = // DataFrame //
var itr = 0
val resultRow = resultDF.count
val resultSet = resultDF.collectAsList
var load_id = 0
var load_dt = ""
var load_hr = 0
while ( itr < resultRow ){
col1 = resultSet.get(itr).getInt(0)
col2 = resultSet.get(itr).getString(1) // if column is having String value
col3 = resultSet.get(itr).getLong(2) // if column is having Long value
// Write other logic for your code //
itr = itr + 1
}
我使用 FOR 的解决方案是因为我需要它:
解决方案 1:
case class campos_tablas(name:String, sector:String, age:Int)
for (row <- df.as[campos_tablas].take(df.count.toInt))
{
print(row.name.toString)
}
解决方案 2:
for (row <- df.take(df.count.toInt))
{
print(row(0).toString)
}
我有以下动态创建的 Spark 数据框:
val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)
val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)
val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)
val data = Seq(row1,row2,row3)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
现在,我需要遍历 sqlDF
中的每一行和每一列来打印每一列,这是我的尝试:
sqlDF.foreach { row =>
row.foreach { col => println(col) }
}
row
是类型 Row
,但不可迭代,这就是此代码在 row.foreach
中引发编译错误的原因。如何迭代Row
中的每一列?
您可以使用 toSeq
将 Row
转换为 Seq
。一旦转向 Seq
,您可以像往常一样使用 foreach
、map
或任何您需要的
sqlDF.foreach { row =>
row.toSeq.foreach{col => println(col) }
}
输出:
Berta
bbb
30
Joe
Andy
aaa
20
ccc
40
您应该在 Row
:
mkString
sqlDF.foreach { row =>
println(row.mkString(","))
}
但请注意,这将打印在执行程序 JVM 中,因此通常您不会看到输出(除非您使用 master = local)
假设您有一个 Dataframe
如下所示
+-----+------+---+
| name|sector|age|
+-----+------+---+
| Andy| aaa| 20|
|Berta| bbb| 30|
| Joe| ccc| 40|
+-----+------+---+
要循环 Dataframe 并从 Dataframe 中提取元素,您可以选择以下方法之一。
方法 1 - 使用 foreach 循环
无法直接使用 foreach
循环来循环数据帧。为此,首先您必须使用 case class
定义数据框的架构,然后必须将此架构指定给数据框。
import spark.implicits._
import org.apache.spark.sql._
case class cls_Employee(name:String, sector:String, age:Int)
val df = Seq(cls_Employee("Andy","aaa", 20), cls_Employee("Berta","bbb", 30), cls_Employee("Joe","ccc", 40)).toDF()
df.as[cls_Employee].take(df.count.toInt).foreach(t => println(s"name=${t.name},sector=${t.sector},age=${t.age}"))
请看下面的结果:
方法 2 - 使用 rdd 循环
在 Dataframe 之上使用 rdd.collect
。 row
变量将包含 rdd
行类型的 Dataframe 的每一行。要从一行中获取每个元素,请使用 row.mkString(",")
,它将以逗号分隔值包含每行的值。使用 split
函数(内置函数),您可以使用索引访问 rdd
行的每个列值。
for (row <- df.rdd.collect)
{
var name = row.mkString(",").split(",")(0)
var sector = row.mkString(",").split(",")(1)
var age = row.mkString(",").split(",")(2)
}
请注意,这种方法有两个缺点。
1、如果列值中有,
,数据会被错误的拆分到相邻的列中。
2. rdd.collect
是一个 action
returns 所有数据到驱动程序的内存,驱动程序的内存可能没有那么大,无法保存数据,最终导致应用程序失败。
我建议使用方法 1。
方法 3 - 使用 where 和 select
你可以直接使用where
和select
,它们会在内部循环查找数据。因为它不应该抛出 Index out of bound 异常,所以使用了 if 条件
if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)
name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString
方法 4 - 使用临时表
您可以将dataframe注册为temptable,它将存储在spark的内存中。然后你可以像其他数据库一样使用select查询来查询数据,然后收集并保存在变量
中df.registerTempTable("student")
name = sqlContext.sql("select name from student where name='Andy'").collect()(0).toString().replace("[","").replace("]","")
sqlDF.foreach
对我不起作用,但@Sarath Avanavu 回答的方法 1 有效,但有时它也在播放记录的顺序。
我找到了另一种有效的方法
df.collect().foreach { row =>
println(row.mkString(","))
}
简单的收集结果然后申请每个
df.collect().foreach(println)
这对我来说效果很好
sqlDF.collect().foreach(row => row.toSeq.foreach(col => println(col)))
您应该遍历允许数据由 Spark 并行处理的分区,并且您可以在分区内的每一行上执行 foreach。
如果需要,您可以将分区中的数据进一步分组
sqlDF.foreachPartition { partitionedRows: Iterator[Model1] =>
if (partitionedRows.take(1).nonEmpty) {
partitionedRows.grouped(numberOfRowsPerBatch).foreach { batch =>
batch.foreach { row =>
.....
让我们假设 resultDF 是 Dataframe。
val resultDF = // DataFrame //
var itr = 0
val resultRow = resultDF.count
val resultSet = resultDF.collectAsList
var load_id = 0
var load_dt = ""
var load_hr = 0
while ( itr < resultRow ){
col1 = resultSet.get(itr).getInt(0)
col2 = resultSet.get(itr).getString(1) // if column is having String value
col3 = resultSet.get(itr).getLong(2) // if column is having Long value
// Write other logic for your code //
itr = itr + 1
}
我使用 FOR 的解决方案是因为我需要它:
解决方案 1:
case class campos_tablas(name:String, sector:String, age:Int)
for (row <- df.as[campos_tablas].take(df.count.toInt))
{
print(row.name.toString)
}
解决方案 2:
for (row <- df.take(df.count.toInt))
{
print(row(0).toString)
}