在 partitionBy 创建的一个输出目录中对数据进行排序

Sort data within one output directory created by partitionBy

我有一个很大的地理空间数据集 partitionBy quadkey 的 5 级。 在每个qk5级目录中,大约有1-50Gb的数据,所以一个文件放不下。我想在进行地理空间查询时从下推过滤器中受益。所以我希望一个 qk5 分区内的文件按更高的 qk 分辨率排序(假设 quadkey 级别 10)。 问题:有没有一种方法可以在 partitionBy 批处理中对数据进行排序? 例如:

qk5=00001/
    part1.parquet
    part2.parquet
    part3.parquet
    part4.parquet
...

qk5=33333/
    part10000.parquet
    part20000.parquet
    part30000.parquet
    part40000.parquet

我想要 part1.parquet、part2.parquet、part3.parquet、part4.parquet 中的数据按列 'qk10' 排序。

这是当前代码,但它只提供了一个特定分区内的排序(例如 part1.parquet):

// Parquet save
preExportRdd.toDF
  .repartition(partitionsNumber, $"salt")
  .sortWithinPartitions($"qk10")
  .drop("salt")
  .write
  .partitionBy("qk")
  .format("parquet")
  .option("compression", "gzip")
  .mode(SaveMode.Append)
  .save(exportUrl)

问题是您没有按 qk 字段对 Dataframe 进行全局排序,这会导致相同的 qk 值分布在不同的 spark 分区中。 在写入阶段,由于partitionBy("qk"),写入特定物理分区(文件夹)的输出可能来自不同的spark分区,这会导致您的输出数据未排序。

尝试以下方法:

preExportRdd.toDF
  .repartitionByRange(partitionsNumber, $"qk", $"qk10", $"salt")
  .sortWithinPartitions($"qk10")
  .drop("salt")
  .write
  .partitionBy("qk")
  .format("parquet")
  .option("compression", "gzip")
  .mode(SaveMode.Append)
  .save(exportUrl)

repartitionByRange 将按提供的列对您的 Dataframe 进行排序,并将排序后的 Dataframe 拆分为所需数量的分区。