在 Spark 中重新分区更改数据帧的行顺序

Repartitioning Changing Row Order of Dataframe in Spark

我想了解在应用 .repartition 函数后我的数据框发生了什么。如果我的原始数据框是:

+--------+------+--------------+-------+-----+
|integers|floats|integer_arrays|letters|nulls|
+--------+------+--------------+-------+-----+
|       1|  -1.0|        [1, 2]|      a|    1|
|       2|   0.5|     [3, 4, 5]|      b| null|
|       3|   2.7|  [6, 7, 8, 9]|      c|    2|
+--------+------+--------------+-------+-----+

而我运行:

df.repartition(10).show()

生成的数据框的行顺序不同:

+--------+------+--------------+-------+-----+
|integers|floats|integer_arrays|letters|nulls|
+--------+------+--------------+-------+-----+
|       3|   2.7|  [6, 7, 8, 9]|      c|    2|
|       2|   0.5|     [3, 4, 5]|      b| null|
|       1|  -1.0|        [1, 2]|      a|    1|
+--------+------+--------------+-------+-----+

为什么行的顺序会改变?

一个包含 3 行的数据帧被分成 10 个分区,实际上发生了什么?

我可以看到它分配的分区吗?

感谢您的帮助。

您的初始 DataFrame 的行分布在不同的分区中。当您调用 show 时,将从分区中获取行的子集并传递给驱动程序,然后驱动程序将以表格格式显示它们。

要查看您的行分配到的分区,请使用 pyspark sql 函数 spark_partition_id():

>>> from pyspark.sql.functions import spark_partition_id
>>> df0 = spark.range(3)
>>> df1 = df0.withColumn("partition_id_before", spark_partition_id())
>>> df1.show()
+---+-------------------+
| id|partition_id_before|
+---+-------------------+
|  0|                  1|
|  1|                  2|
|  2|                  3|
+---+-------------------+

现在,当您要求重新洗牌时,Spark 将计算每一行的哈希值,并根据该值和洗牌操作中使用的默认分区数,将每一行移动到一个(可能不同的)分区,因为你可以在下面看到:

>>> df2 = df1.repartition(10).withColumn("partition_id_after", spark_partition_id())
>>> df2.show()
+---+-------------------+------------------+
| id|partition_id_before|partition_id_after|
+---+-------------------+------------------+
|  2|                  3|                 5|
|  0|                  1|                 6|
|  1|                  2|                 9|
+---+-------------------+------------------+

一般来说,由于 Spark 是分布式处理的框架,我的建议是不要依赖(感知的)行的位置顺序,而是将 DataFrame 的内容视为 set(一个缺乏秩序的集合,就像在群论中一样)行。 orderBy 之类的函数通常仅用于显示目的,例如前 N 个结果来自某物,然后顺序很重要。不过在大多数操作中,请忽略顺序。