在 partitionBy 创建的一个输出目录中对数据进行排序
Sort data within one output directory created by partitionBy
我有一个很大的地理空间数据集 partitionBy quadkey 的 5 级。
在每个qk5级目录中,大约有1-50Gb的数据,所以一个文件放不下。我想在进行地理空间查询时从下推过滤器中受益。所以我希望一个 qk5 分区内的文件按更高的 qk 分辨率排序(假设 quadkey 级别 10)。
问题:有没有一种方法可以在 partitionBy 批处理中对数据进行排序?
例如:
qk5=00001/
part1.parquet
part2.parquet
part3.parquet
part4.parquet
...
qk5=33333/
part10000.parquet
part20000.parquet
part30000.parquet
part40000.parquet
我想要 part1.parquet、part2.parquet、part3.parquet、part4.parquet 中的数据按列 'qk10' 排序。
这是当前代码,但它只提供了一个特定分区内的排序(例如 part1.parquet):
// Parquet save
preExportRdd.toDF
.repartition(partitionsNumber, $"salt")
.sortWithinPartitions($"qk10")
.drop("salt")
.write
.partitionBy("qk")
.format("parquet")
.option("compression", "gzip")
.mode(SaveMode.Append)
.save(exportUrl)
问题是您没有按 qk
字段对 Dataframe 进行全局排序,这会导致相同的 qk
值分布在不同的 spark 分区中。
在写入阶段,由于partitionBy("qk")
,写入特定物理分区(文件夹)的输出可能来自不同的spark分区,这会导致您的输出数据未排序。
尝试以下方法:
preExportRdd.toDF
.repartitionByRange(partitionsNumber, $"qk", $"qk10", $"salt")
.sortWithinPartitions($"qk10")
.drop("salt")
.write
.partitionBy("qk")
.format("parquet")
.option("compression", "gzip")
.mode(SaveMode.Append)
.save(exportUrl)
repartitionByRange
将按提供的列对您的 Dataframe 进行排序,并将排序后的 Dataframe 拆分为所需数量的分区。
我有一个很大的地理空间数据集 partitionBy quadkey 的 5 级。 在每个qk5级目录中,大约有1-50Gb的数据,所以一个文件放不下。我想在进行地理空间查询时从下推过滤器中受益。所以我希望一个 qk5 分区内的文件按更高的 qk 分辨率排序(假设 quadkey 级别 10)。 问题:有没有一种方法可以在 partitionBy 批处理中对数据进行排序? 例如:
qk5=00001/
part1.parquet
part2.parquet
part3.parquet
part4.parquet
...
qk5=33333/
part10000.parquet
part20000.parquet
part30000.parquet
part40000.parquet
我想要 part1.parquet、part2.parquet、part3.parquet、part4.parquet 中的数据按列 'qk10' 排序。
这是当前代码,但它只提供了一个特定分区内的排序(例如 part1.parquet):
// Parquet save
preExportRdd.toDF
.repartition(partitionsNumber, $"salt")
.sortWithinPartitions($"qk10")
.drop("salt")
.write
.partitionBy("qk")
.format("parquet")
.option("compression", "gzip")
.mode(SaveMode.Append)
.save(exportUrl)
问题是您没有按 qk
字段对 Dataframe 进行全局排序,这会导致相同的 qk
值分布在不同的 spark 分区中。
在写入阶段,由于partitionBy("qk")
,写入特定物理分区(文件夹)的输出可能来自不同的spark分区,这会导致您的输出数据未排序。
尝试以下方法:
preExportRdd.toDF
.repartitionByRange(partitionsNumber, $"qk", $"qk10", $"salt")
.sortWithinPartitions($"qk10")
.drop("salt")
.write
.partitionBy("qk")
.format("parquet")
.option("compression", "gzip")
.mode(SaveMode.Append)
.save(exportUrl)
repartitionByRange
将按提供的列对您的 Dataframe 进行排序,并将排序后的 Dataframe 拆分为所需数量的分区。