Scala:如何按行号拆分数据框?
Scala: How can I split up a dataframe by row number?
我想将 270 万行的数据帧拆分为 100000 行的小数据帧,所以最终得到大约 27 个数据帧,我也想将其存储为 csv 文件。
我已经看过这个 partitionBy 和 groupBy,但我不需要担心任何条件,除了它们必须按日期排序。我正在尝试编写自己的代码来完成这项工作,但如果您知道我可以使用的一些 Scala (Spark) 函数,那就太好了!
谢谢大家的建议!
您可以使用 RDD API 中的 zipWithIndex
(不幸的是,在 SparkSQL 中没有等效项)将每一行映射到一个索引,范围在 0
和 rdd.count - 1
之间。
因此,如果您有一个我认为已相应排序的数据框,您将需要在两个 API 之间来回切换,如下所示:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// creating mock data
val df = spark.range(100).withColumn("test", 'id % 10)
// zipping the data
val partitionSize = 5 // I use 5 but you can use 100000 in your case
val zipped_rdd = df.rdd
.zipWithIndex.map{ case (row, id) =>
Row.fromSeq(row.toSeq :+ id / partitionSize )
}
//back to df
val newField = StructField("partition", LongType, false)
val zipped_df = spark
.createDataFrame(zipped_rdd, df.schema.add(newField))
让我们看一下数据,我们有一个名为分区的新列,它对应于您想要拆分数据的方式。
zipped_df.show(15) // 5 rows by partition
+---+----+---------+
| id|test|partition|
+---+----+---------+
| 0| 0| 0|
| 1| 1| 0|
| 2| 2| 0|
| 3| 3| 0|
| 4| 4| 0|
| 5| 5| 1|
| 6| 6| 1|
| 7| 7| 1|
| 8| 8| 1|
| 9| 9| 1|
| 10| 0| 2|
| 11| 1| 2|
| 12| 2| 2|
| 13| 3| 2|
| 14| 4| 2|
+---+----+---------+
// using partitionBy to write the data
zipped_df.write
.partitionBy("partition")
.csv(".../testPart.csv")
我想将 270 万行的数据帧拆分为 100000 行的小数据帧,所以最终得到大约 27 个数据帧,我也想将其存储为 csv 文件。
我已经看过这个 partitionBy 和 groupBy,但我不需要担心任何条件,除了它们必须按日期排序。我正在尝试编写自己的代码来完成这项工作,但如果您知道我可以使用的一些 Scala (Spark) 函数,那就太好了!
谢谢大家的建议!
您可以使用 RDD API 中的 zipWithIndex
(不幸的是,在 SparkSQL 中没有等效项)将每一行映射到一个索引,范围在 0
和 rdd.count - 1
之间。
因此,如果您有一个我认为已相应排序的数据框,您将需要在两个 API 之间来回切换,如下所示:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// creating mock data
val df = spark.range(100).withColumn("test", 'id % 10)
// zipping the data
val partitionSize = 5 // I use 5 but you can use 100000 in your case
val zipped_rdd = df.rdd
.zipWithIndex.map{ case (row, id) =>
Row.fromSeq(row.toSeq :+ id / partitionSize )
}
//back to df
val newField = StructField("partition", LongType, false)
val zipped_df = spark
.createDataFrame(zipped_rdd, df.schema.add(newField))
让我们看一下数据,我们有一个名为分区的新列,它对应于您想要拆分数据的方式。
zipped_df.show(15) // 5 rows by partition
+---+----+---------+
| id|test|partition|
+---+----+---------+
| 0| 0| 0|
| 1| 1| 0|
| 2| 2| 0|
| 3| 3| 0|
| 4| 4| 0|
| 5| 5| 1|
| 6| 6| 1|
| 7| 7| 1|
| 8| 8| 1|
| 9| 9| 1|
| 10| 0| 2|
| 11| 1| 2|
| 12| 2| 2|
| 13| 3| 2|
| 14| 4| 2|
+---+----+---------+
// using partitionBy to write the data
zipped_df.write
.partitionBy("partition")
.csv(".../testPart.csv")