将 Spark DataFrame 数据分成单独的文件
Divide Spark DataFrame data into separate files
我有以下来自 s3 文件的 DataFrame 输入,需要将数据转换为以下所需的输出。我在 Scala 中使用 Spark 1.5.1 版,但可以使用 Python 更改为 Spark。欢迎提出任何建议。
数据帧输入:
name animal data
john mouse aaaaa
bob mouse bbbbb
bob mouse ccccc
bob dog ddddd
期望的输出:
john/mouse/file.csv
bob/mouse/file.csv
bob/dog/file.csv
terminal$ cat bob/mouse/file.csv
bbbbb
ccccc
terminal$ cat bob/dog/file.csv
ddddd
这是我尝试过的现有 Spark Scala 代码:
val sc = new SparkContext(new SparkConf())
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val df = sqlc.read.json("raw.gz")
val cols = Seq("name", "animal")
df.groupBy(cols.head, cols.tail: _*).count().take(100).foreach(println)
当前输出:
[john,mouse,1]
[bob,mouse,2]
[bob,dog,1]
我现有代码的一些问题是 groupBy returns 一个 GroupedData 对象,我可能不想对该数据执行 count/sum/agg 函数。我正在寻找一种更好的技术来分组和输出数据。数据集非常大。
这可以使用 DataFrameWriter
的 partitionBy
选项来实现。一般语法如下:
df.write.partitionBy("name", "animal").format(...).save(...)
不幸的是,Spark 1.5 中唯一支持分区的纯文本格式是 JSON。
如果您可以将 Spark 安装更新为:
- 1.6 - 您可以使用
partitionBy
和 text
格式。如果您需要组 (repartition
) 的单个输出文件,也需要 1.6。
- 2.0 - 您可以使用
partitionBy
和 csv
格式。
我相信在 1.5 中您最好的选择是将文件写成 JSON 并转换单个输出文件。
如果distinct name', 'animals
的数量很少,你可以尝试为每个组执行单独的写入:
val dist = df.select("name", "animal").rdd.collect.map {
case Row(name: String, animal: String) => (name, animal)
}
for {
(name, animal) <- dist
} df.where($"name" === name && $"animal" === animal)
.select($"data").write.format("csv").save(s"/prefix/$name/$animal")
但这不会随着组合数量的增加而扩展。
我有以下来自 s3 文件的 DataFrame 输入,需要将数据转换为以下所需的输出。我在 Scala 中使用 Spark 1.5.1 版,但可以使用 Python 更改为 Spark。欢迎提出任何建议。
数据帧输入:
name animal data
john mouse aaaaa
bob mouse bbbbb
bob mouse ccccc
bob dog ddddd
期望的输出:
john/mouse/file.csv
bob/mouse/file.csv
bob/dog/file.csv
terminal$ cat bob/mouse/file.csv
bbbbb
ccccc
terminal$ cat bob/dog/file.csv
ddddd
这是我尝试过的现有 Spark Scala 代码:
val sc = new SparkContext(new SparkConf())
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val df = sqlc.read.json("raw.gz")
val cols = Seq("name", "animal")
df.groupBy(cols.head, cols.tail: _*).count().take(100).foreach(println)
当前输出:
[john,mouse,1]
[bob,mouse,2]
[bob,dog,1]
我现有代码的一些问题是 groupBy returns 一个 GroupedData 对象,我可能不想对该数据执行 count/sum/agg 函数。我正在寻找一种更好的技术来分组和输出数据。数据集非常大。
这可以使用 DataFrameWriter
的 partitionBy
选项来实现。一般语法如下:
df.write.partitionBy("name", "animal").format(...).save(...)
不幸的是,Spark 1.5 中唯一支持分区的纯文本格式是 JSON。
如果您可以将 Spark 安装更新为:
- 1.6 - 您可以使用
partitionBy
和text
格式。如果您需要组 (repartition
) 的单个输出文件,也需要 1.6。 - 2.0 - 您可以使用
partitionBy
和csv
格式。
我相信在 1.5 中您最好的选择是将文件写成 JSON 并转换单个输出文件。
如果distinct name', 'animals
的数量很少,你可以尝试为每个组执行单独的写入:
val dist = df.select("name", "animal").rdd.collect.map {
case Row(name: String, animal: String) => (name, animal)
}
for {
(name, animal) <- dist
} df.where($"name" === name && $"animal" === animal)
.select($"data").write.format("csv").save(s"/prefix/$name/$animal")
但这不会随着组合数量的增加而扩展。