Spark DataFrame zipWithIndex
Spark DataFrame zipWithIndex
我正在使用 DataFrame 读取 .parquet 文件,但不是将它们变成一个 rdd 来执行我想对它们执行的正常处理。
所以我有我的文件:
val dataSplit = sqlContext.parquetFile("input.parquet")
val convRDD = dataSplit.rdd
val columnIndex = convRDD.flatMap(r => r.zipWithIndex)
即使我从数据帧转换为 RDD,也会出现以下错误:
:26: error: value zipWithIndex is not a member of
org.apache.spark.sql.Row
任何人都知道如何做我想做的事情,本质上是试图获取值和列索引。
我在想:
val dataSplit = sqlContext.parquetFile(inputVal.toString)
val schema = dataSplit.schema
val columnIndex = dataSplit.flatMap(r => 0 until schema.length
但由于不确定如何对 zipWithIndex 执行相同操作而卡在了最后一部分。
您可以简单地将 Row
转换为 Seq
:
convRDD.flatMap(r => r.toSeq.zipWithIndex)
这里要注意的重要一点是提取类型信息变得很棘手。 Row.toSeq
returns Seq[Any]
结果 RDD
是 RDD[(Any, Int)]
。
我正在使用 DataFrame 读取 .parquet 文件,但不是将它们变成一个 rdd 来执行我想对它们执行的正常处理。
所以我有我的文件:
val dataSplit = sqlContext.parquetFile("input.parquet")
val convRDD = dataSplit.rdd
val columnIndex = convRDD.flatMap(r => r.zipWithIndex)
即使我从数据帧转换为 RDD,也会出现以下错误:
:26: error: value zipWithIndex is not a member of org.apache.spark.sql.Row
任何人都知道如何做我想做的事情,本质上是试图获取值和列索引。
我在想:
val dataSplit = sqlContext.parquetFile(inputVal.toString)
val schema = dataSplit.schema
val columnIndex = dataSplit.flatMap(r => 0 until schema.length
但由于不确定如何对 zipWithIndex 执行相同操作而卡在了最后一部分。
您可以简单地将 Row
转换为 Seq
:
convRDD.flatMap(r => r.toSeq.zipWithIndex)
这里要注意的重要一点是提取类型信息变得很棘手。 Row.toSeq
returns Seq[Any]
结果 RDD
是 RDD[(Any, Int)]
。