SPARK DataFrame:如何根据相同的列值有效地为每个组拆分数据框

SPARK DataFrame: How to efficiently split dataframe for each group based on same column values

我生成了一个 DataFrame,如下所示:

df.groupBy($"Hour", $"Category")
  .agg(sum($"value").alias("TotalValue"))
  .sort($"Hour".asc,$"TotalValue".desc))

结果如下:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
|   3|    cat8|      35.6|
| ...|    ....|      ....|
+----+--------+----------+

我想根据 col("Hour") 的每个唯一值制作新的数据框,即

所以期望的输出是:

df0 as:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
+----+--------+----------+

df1 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
+----+--------+----------+

同样,

df2 as:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
+----+--------+----------+

非常感谢任何帮助。

编辑 1:

我尝试过的:

df.foreach(
  row => splitHour(row)
  )

def splitHour(row: Row) ={
    val Hour=row.getAs[Long]("Hour")

    val HourDF= sparkSession.createDataFrame(List((s"$Hour",1)))

    val hdf=HourDF.withColumnRenamed("_1","Hour_unique").drop("_2")

    val mydf: DataFrame =df.join(hdf,df("Hour")===hdf("Hour_unique"))

    mydf.write.mode("overwrite").parquet(s"/home/dev/shaishave/etc/myparquet/$Hour/")
  }

此策略存在问题:

运行 在超过 100 万行的数据帧 df 上花费了 8 个小时,并且在单个节点上为 spark 作业提供了大约 10 GB 的 RAM。所以,join 被证明是非常低效的。

警告:我必须将每个数据帧 mydf 写成镶木地板,它具有需要维护的嵌套模式(而不是展平)。

正如我在评论中指出的那样,解决此问题的一种可能简单的方法是使用:

df.write.partitionBy("hour").saveAsTable("myparquet")

如前所述,文件夹结构为 myparquet/hour=1myparquet/hour=2、...、myparquet/hour=24,而不是 myparquet/1myparquet/2、...。 .., myparquet/24.

要更改文件夹结构,您可以

  1. 可能在显式 HiveContext 中使用 Hive 配置设置 hcat.dynamic.partitioning.custom.pattern;更多信息请访问 HCatalog DynamicPartitions
  2. 另一种方法是在执行 df.write.partitionBy.saveAsTable(...) 命令后直接更改文件系统,使用类似 for f in *; do mv $f ${f/${f:0:5}/} ; done 的命令,这会从文件夹名称中删除 Hour= 文本。

重要的是要注意,通过更改文件夹的命名模式,当您在该文件夹中 运行 spark.read.parquet(...) 时,Spark 将不会自动理解动态分区,因为它缺少 partitionKey (即Hour)信息。

//If you want to divide a dataset into n number of equal datasetssets
double[] arraySplit = {1,1,1...,n}; //you can also divide into ratio if you change the numbers.

List<Dataset<String>> datasetList = dataset.randomSplitAsList(arraySplit,1);

另一种可能的解决方案:

df.write.mode("overwrite").partitionBy("hour").parquet("address/to/parquet/location")

除了使用 parquetmode("overwrite").

外,这与第一个答案类似