pyspark 数据框在删除列后不保持顺序

pyspark dataframe not maintaining order after dropping a column

我创建了一个数据框:

df = spark.createDataFrame(pd.DataFrame({'a':range(12),'c':range(12)})).repartition(8)

其内容是:

df.show()
+---+---+
|  a|  c|
+---+---+
|  0|  0|
|  1|  1|
|  3|  3|
|  5|  5|
|  6|  6|
|  8|  8|
|  9|  9|
| 10| 10|
|  2|  2|
|  4|  4|
|  7|  7|
| 11| 11|
+---+---+

但是,如果我删除一列,剩余的列会被置换

df.drop('c').show()
+---+
|  a|
+---+
|  0|
|  2|
|  3|
|  5|
|  6|
|  7|
|  9|
| 11|
|  1|
|  4|
|  8|
| 10|
+---+

请帮助我了解这里发生了什么?

我想添加我的答案,因为我觉得我可以稍微不同地解释这个问题。

重新分区导致 RoundRobinPartition。它基本上以 round-robin 方式重新分配数据。

由于您正在再次评估数据框,因此它会在删除后重新计算分区。

您可以通过 运行除了显示的内容之外再执行一些命令来查看。

df = spark.createDataFrame(pd.DataFrame({'a':range(12),'c':range(12)})).repartition(8)

df.explain()
# == Physical Plan ==
# Exchange RoundRobinPartitioning(8)
# +- Scan ExistingRDD[a#14L,c#15L]

print("Partitions structure: {}".format(df.rdd.glom().collect()))
# Partitions structure: [[], [], [], [], [], [], [Row(a=0, c=0), Row(a=1, c=1), Row(a=3, c=3), Row(a=5, c=5), Row(a=6, c=6), Row(a=8, c=8), Row(a=9, c=9), Row(a=10, c=10)], [Row(a=2, c=2), Row(a=4, c=4), Row(a=7, c=7), Row(a=11, c=11)]]

temp = df.drop("c")

temp.explain()
# == Physical Plan ==
# Exchange RoundRobinPartitioning(8)
# +- *(1) Project [a#14L]
#   +- Scan ExistingRDD[a#14L,c#15L]


print("Partitions structure: {}".format(temp.rdd.glom().collect()))
# Partitions structure: [[], [], [], [], [], [], [Row(a=0), Row(a=2), Row(a=3), Row(a=5), Row(a=6), Row(a=7), Row(a=9), Row(a=11)], [Row(a=1), Row(a=4), Row(a=8), Row(a=10)]]

在上面的代码中,explain() 显示发生了 RoundRobinPartitioningglom 的使用显示了跨分区的数据重新分配。

在原始数据框中,分区按照您看到 show().

结果的顺序排列

在上面的第二个数据帧中,您可以看到数据在最后两个分区之间进行了混洗,导致它们的顺序不同。这是因为当 re-evaluating 数据帧再次重新分区 运行 时。

根据评论中的讨论进行编辑

如果您 运行 df.drop('b'),我们正在尝试删除不存在的列。所以这就是所谓的 noop 或无操作。所以分区没有改变。

df.drop('b').explain()

# == Physical Plan ==
# Exchange RoundRobinPartitioning(8)
# +- Scan ExistingRDD[a#70L,c#71L]

类似地,如果您要添加一列并 运行 它,则添加该列之前的圆形分区 运行s。这再次导致相同的分区,因此顺序与原始数据帧一致。

import pyspark.sql.functions as f

df.withColumn('tt', f.rand()).explain()

# == Physical Plan ==
# *(1) Project [a#70L, c#71L, rand(-3030352041536166328) AS tt#76]
# +- Exchange RoundRobinPartitioning(8)
#    +- Scan ExistingRDD[a#70L,c#71L]

在df.drop('c')的情况下,首先删除列,然后应用分区程序。这会导致不同的分区,因为分区前阶段的结果数据帧 运行 不同。

df.drop('c').explain()

# == Physical Plan ==
# Exchange RoundRobinPartitioning(8)
# +- *(1) Project [a#70L]
#    +- Scan ExistingRDD[a#70L,c#71L]

正如这个问题的另一个答案中提到的,round-robin 分区器对于不同的数据是随机的,但与分区为 运行 的相同数据一致。因此,如果底层数据因操作而改变,得到的分区也会不同。