SPARK DataFrame:如何根据相同的列值有效地为每个组拆分数据框
SPARK DataFrame: How to efficiently split dataframe for each group based on same column values
我生成了一个 DataFrame,如下所示:
df.groupBy($"Hour", $"Category")
.agg(sum($"value").alias("TotalValue"))
.sort($"Hour".asc,$"TotalValue".desc))
结果如下:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
我想根据 col("Hour")
的每个唯一值制作新的数据框,即
- 小时组==0
- 对于 Hour==1 组
- Hour==2组
等等...
所以期望的输出是:
df0 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
+----+--------+----------+
df1 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
+----+--------+----------+
同样,
df2 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
+----+--------+----------+
非常感谢任何帮助。
编辑 1:
我尝试过的:
df.foreach(
row => splitHour(row)
)
def splitHour(row: Row) ={
val Hour=row.getAs[Long]("Hour")
val HourDF= sparkSession.createDataFrame(List((s"$Hour",1)))
val hdf=HourDF.withColumnRenamed("_1","Hour_unique").drop("_2")
val mydf: DataFrame =df.join(hdf,df("Hour")===hdf("Hour_unique"))
mydf.write.mode("overwrite").parquet(s"/home/dev/shaishave/etc/myparquet/$Hour/")
}
此策略存在问题:
运行 在超过 100 万行的数据帧 df
上花费了 8 个小时,并且在单个节点上为 spark 作业提供了大约 10 GB 的 RAM。所以,join
被证明是非常低效的。
警告:我必须将每个数据帧 mydf
写成镶木地板,它具有需要维护的嵌套模式(而不是展平)。
正如我在评论中指出的那样,解决此问题的一种可能简单的方法是使用:
df.write.partitionBy("hour").saveAsTable("myparquet")
如前所述,文件夹结构为 myparquet/hour=1
、myparquet/hour=2
、...、myparquet/hour=24
,而不是 myparquet/1
、myparquet/2
、...。 .., myparquet/24
.
要更改文件夹结构,您可以
- 可能在显式 HiveContext 中使用 Hive 配置设置
hcat.dynamic.partitioning.custom.pattern
;更多信息请访问 HCatalog DynamicPartitions。
- 另一种方法是在执行
df.write.partitionBy.saveAsTable(...)
命令后直接更改文件系统,使用类似 for f in *; do mv $f ${f/${f:0:5}/} ; done
的命令,这会从文件夹名称中删除 Hour=
文本。
重要的是要注意,通过更改文件夹的命名模式,当您在该文件夹中 运行 spark.read.parquet(...)
时,Spark 将不会自动理解动态分区,因为它缺少 partitionKey (即Hour
)信息。
//If you want to divide a dataset into n number of equal datasetssets
double[] arraySplit = {1,1,1...,n}; //you can also divide into ratio if you change the numbers.
List<Dataset<String>> datasetList = dataset.randomSplitAsList(arraySplit,1);
另一种可能的解决方案:
df.write.mode("overwrite").partitionBy("hour").parquet("address/to/parquet/location")
除了使用 parquet
和 mode("overwrite")
.
外,这与第一个答案类似
我生成了一个 DataFrame,如下所示:
df.groupBy($"Hour", $"Category")
.agg(sum($"value").alias("TotalValue"))
.sort($"Hour".asc,$"TotalValue".desc))
结果如下:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
我想根据 col("Hour")
的每个唯一值制作新的数据框,即
- 小时组==0
- 对于 Hour==1 组
- Hour==2组 等等...
所以期望的输出是:
df0 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
+----+--------+----------+
df1 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
+----+--------+----------+
同样,
df2 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
+----+--------+----------+
非常感谢任何帮助。
编辑 1:
我尝试过的:
df.foreach(
row => splitHour(row)
)
def splitHour(row: Row) ={
val Hour=row.getAs[Long]("Hour")
val HourDF= sparkSession.createDataFrame(List((s"$Hour",1)))
val hdf=HourDF.withColumnRenamed("_1","Hour_unique").drop("_2")
val mydf: DataFrame =df.join(hdf,df("Hour")===hdf("Hour_unique"))
mydf.write.mode("overwrite").parquet(s"/home/dev/shaishave/etc/myparquet/$Hour/")
}
此策略存在问题:
运行 在超过 100 万行的数据帧 df
上花费了 8 个小时,并且在单个节点上为 spark 作业提供了大约 10 GB 的 RAM。所以,join
被证明是非常低效的。
警告:我必须将每个数据帧 mydf
写成镶木地板,它具有需要维护的嵌套模式(而不是展平)。
正如我在评论中指出的那样,解决此问题的一种可能简单的方法是使用:
df.write.partitionBy("hour").saveAsTable("myparquet")
如前所述,文件夹结构为 myparquet/hour=1
、myparquet/hour=2
、...、myparquet/hour=24
,而不是 myparquet/1
、myparquet/2
、...。 .., myparquet/24
.
要更改文件夹结构,您可以
- 可能在显式 HiveContext 中使用 Hive 配置设置
hcat.dynamic.partitioning.custom.pattern
;更多信息请访问 HCatalog DynamicPartitions。 - 另一种方法是在执行
df.write.partitionBy.saveAsTable(...)
命令后直接更改文件系统,使用类似for f in *; do mv $f ${f/${f:0:5}/} ; done
的命令,这会从文件夹名称中删除Hour=
文本。
重要的是要注意,通过更改文件夹的命名模式,当您在该文件夹中 运行 spark.read.parquet(...)
时,Spark 将不会自动理解动态分区,因为它缺少 partitionKey (即Hour
)信息。
//If you want to divide a dataset into n number of equal datasetssets
double[] arraySplit = {1,1,1...,n}; //you can also divide into ratio if you change the numbers.
List<Dataset<String>> datasetList = dataset.randomSplitAsList(arraySplit,1);
另一种可能的解决方案:
df.write.mode("overwrite").partitionBy("hour").parquet("address/to/parquet/location")
除了使用 parquet
和 mode("overwrite")
.