sortWithinPartitions 如何排序?
how does sortWithinPartitions sort?
将 sortWithinPartitions 应用于 df 并将输出写入 table 后,我得到了一个结果,我不确定如何解释。
df
.select($"type", $"id", $"time")
.sortWithinPartitions($"type", $"id", $"time")
结果文件有点像
1 a 5
2 b 1
1 a 6
2 b 2
1 a 7
2 b 3
1 a 8
2 b 4
它实际上不是随机的,但也不像我期望的那样排序。即,首先按类型,然后是 id,然后是时间。
如果我尝试在排序之前使用重新分区,那么我会得到我想要的结果。但由于某种原因,文件重量增加了 5 倍(100gb 对 20gb)。
我正在写信给 hive orc table,压缩设置为 snappy。
有谁知道为什么这样排序以及为什么重新分区得到正确的顺序,但大小更大?
使用 spark 2.2.
sortWithinPartition 的文档指出
Returns a new Dataset with each partition sorted by the given expressions
想到此函数的最简单方法是想象用作主要排序标准的第四列(分区 ID)。函数 spark_partition_id() 打印分区。
例如,如果您只有一个大分区(这是您作为 Spark 用户永远不会做的事情!),sortWithinPartition
可以正常排序:
df.repartition(1)
.sortWithinPartitions("type","id","time")
.withColumn("partition", spark_partition_id())
.show();
打印
+----+---+----+---------+
|type| id|time|partition|
+----+---+----+---------+
| 1| a| 5| 0|
| 1| a| 6| 0|
| 1| a| 7| 0|
| 1| a| 8| 0|
| 2| b| 1| 0|
| 2| b| 2| 0|
| 2| b| 3| 0|
| 2| b| 4| 0|
+----+---+----+---------+
如果有更多分区,结果只在每个分区内排序:
df.repartition(4)
.sortWithinPartitions("type","id","time")
.withColumn("partition", spark_partition_id())
.show();
打印
+----+---+----+---------+
|type| id|time|partition|
+----+---+----+---------+
| 2| b| 1| 0|
| 2| b| 3| 0|
| 1| a| 5| 1|
| 1| a| 6| 1|
| 1| a| 8| 2|
| 2| b| 2| 2|
| 1| a| 7| 3|
| 2| b| 4| 3|
+----+---+----+---------+
为什么要使用 sortWithPartition
而不是 sort? sortWithPartition
does not trigger a shuffle,因为数据仅在执行程序中移动。 sort
但是会触发随机播放。因此 sortWithPartition
执行得更快。如果数据按有意义的列进行分区,则在每个分区内排序可能就足够了。
将 sortWithinPartitions 应用于 df 并将输出写入 table 后,我得到了一个结果,我不确定如何解释。
df
.select($"type", $"id", $"time")
.sortWithinPartitions($"type", $"id", $"time")
结果文件有点像
1 a 5
2 b 1
1 a 6
2 b 2
1 a 7
2 b 3
1 a 8
2 b 4
它实际上不是随机的,但也不像我期望的那样排序。即,首先按类型,然后是 id,然后是时间。 如果我尝试在排序之前使用重新分区,那么我会得到我想要的结果。但由于某种原因,文件重量增加了 5 倍(100gb 对 20gb)。
我正在写信给 hive orc table,压缩设置为 snappy。
有谁知道为什么这样排序以及为什么重新分区得到正确的顺序,但大小更大?
使用 spark 2.2.
sortWithinPartition 的文档指出
Returns a new Dataset with each partition sorted by the given expressions
想到此函数的最简单方法是想象用作主要排序标准的第四列(分区 ID)。函数 spark_partition_id() 打印分区。
例如,如果您只有一个大分区(这是您作为 Spark 用户永远不会做的事情!),sortWithinPartition
可以正常排序:
df.repartition(1)
.sortWithinPartitions("type","id","time")
.withColumn("partition", spark_partition_id())
.show();
打印
+----+---+----+---------+
|type| id|time|partition|
+----+---+----+---------+
| 1| a| 5| 0|
| 1| a| 6| 0|
| 1| a| 7| 0|
| 1| a| 8| 0|
| 2| b| 1| 0|
| 2| b| 2| 0|
| 2| b| 3| 0|
| 2| b| 4| 0|
+----+---+----+---------+
如果有更多分区,结果只在每个分区内排序:
df.repartition(4)
.sortWithinPartitions("type","id","time")
.withColumn("partition", spark_partition_id())
.show();
打印
+----+---+----+---------+
|type| id|time|partition|
+----+---+----+---------+
| 2| b| 1| 0|
| 2| b| 3| 0|
| 1| a| 5| 1|
| 1| a| 6| 1|
| 1| a| 8| 2|
| 2| b| 2| 2|
| 1| a| 7| 3|
| 2| b| 4| 3|
+----+---+----+---------+
为什么要使用 sortWithPartition
而不是 sort? sortWithPartition
does not trigger a shuffle,因为数据仅在执行程序中移动。 sort
但是会触发随机播放。因此 sortWithPartition
执行得更快。如果数据按有意义的列进行分区,则在每个分区内排序可能就足够了。