使用spark按排序顺序将数据合并到csv文件
Using spark to merge data in sorted order to csv files
我有这样的数据集:
name time val
---- ----- ---
fred 04:00 111
greg 03:00 123
fred 01:00 411
fred 05:00 921
fred 11:00 157
greg 12:00 333
以及某个文件夹中的 csv 文件,每个文件对应数据集中的每个唯一名称:
fred.csv
greg.csv
例如 fred.csv 的内容如下所示:
00:00 222
10:00 133
我的目标是按排序的时间顺序将数据集有效地合并到 CSV 中,这样 fred.csv,例如,最终会像这样:
00:00 222
01:00 411
04:00 111
05:00 921
10:00 133
在现实中,有成千上万个唯一的名字,而不仅仅是两个。我使用 union 和 sort 函数按顺序添加行,但我没有成功地使用 partitionBy,对于每个,或者将行合并到它们正确的 CSV 文件中。
导入并声明必要的变量
val spark = SparkSession.builder
.master("local")
.appName("Partition Sort Demo")
.getOrCreate;
import spark.implicits._
从源文件创建数据框
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.csv("csv/file/location")
//df.show()
+----+-----+---+
|name| time|val|
+----+-----+---+
|fred|04:00|111|
|greg|03:00|123|
|fred|01:00|411|
|fred|05:00|921|
|fred|11:00|157|
|greg|12:00|333|
+----+-----+---+
Now repartition
dataframe by name and sort
each partition then save
them
//repartition
val repartitionedDf = df.repartition($"name")
for {
//fetch the distinct names in dataframe use as filename
distinctName <- df.dropDuplicates("name").collect.map(_ (0))
} yield {
import org.apache.spark.sql.functions.lit
repartitionedDf.select("time", "val")
.filter($"name" === lit(distinctName)) //filter df by name
.coalesce(1)
.sortWithinPartitions($"time") //sort
.write.mode("overwrite").csv("location/" + distinctName + ".csv") //save
}
注:
CSV 文件的内容在突出显示的文件中可用。
我有这样的数据集:
name time val
---- ----- ---
fred 04:00 111
greg 03:00 123
fred 01:00 411
fred 05:00 921
fred 11:00 157
greg 12:00 333
以及某个文件夹中的 csv 文件,每个文件对应数据集中的每个唯一名称:
fred.csv
greg.csv
例如 fred.csv 的内容如下所示:
00:00 222
10:00 133
我的目标是按排序的时间顺序将数据集有效地合并到 CSV 中,这样 fred.csv,例如,最终会像这样:
00:00 222
01:00 411
04:00 111
05:00 921
10:00 133
在现实中,有成千上万个唯一的名字,而不仅仅是两个。我使用 union 和 sort 函数按顺序添加行,但我没有成功地使用 partitionBy,对于每个,或者将行合并到它们正确的 CSV 文件中。
导入并声明必要的变量
val spark = SparkSession.builder
.master("local")
.appName("Partition Sort Demo")
.getOrCreate;
import spark.implicits._
从源文件创建数据框
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.csv("csv/file/location")
//df.show()
+----+-----+---+
|name| time|val|
+----+-----+---+
|fred|04:00|111|
|greg|03:00|123|
|fred|01:00|411|
|fred|05:00|921|
|fred|11:00|157|
|greg|12:00|333|
+----+-----+---+
Now
repartition
dataframe by name andsort
each partition thensave
them//repartition val repartitionedDf = df.repartition($"name") for { //fetch the distinct names in dataframe use as filename distinctName <- df.dropDuplicates("name").collect.map(_ (0)) } yield { import org.apache.spark.sql.functions.lit repartitionedDf.select("time", "val") .filter($"name" === lit(distinctName)) //filter df by name .coalesce(1) .sortWithinPartitions($"time") //sort .write.mode("overwrite").csv("location/" + distinctName + ".csv") //save }
注:
CSV 文件的内容在突出显示的文件中可用。