合并(1)与重新分区(1)后的不同排序结果

Different sort results after coalesce(1) vs repartition(1)

我有以下脚本 returns 正确排序的结果:

from transforms.api import transform, Output
from pyspark.sql import functions as F


@transform(
    out=Output("ri.foundry.main.dataset.29fdbff7-168a-457d-bb79-8f7508cede9d"),
)
def compute(out, ctx):

    data = [("1", "2022-02-01", "older"),
            ("1", "2022-02-12", "older"),
            ("1", "2022-02-09", "new")]
    df_inp = (
        ctx.spark_session.createDataFrame(data, ["c1", "date", "record_status"])
        .withColumn("date", F.to_date("date"))
        .withColumn("record_status", F.lit("older"))
    )
    df_upd = (
        ctx.spark_session.createDataFrame([('1',)], ['c1'])
        .withColumn('date', F.to_date(F.lit('2022-02-17')))
        .withColumn('record_status', F.lit('new'))
    )

    df = df_inp.unionByName(df_upd)

    df = df.coalesce(1)
    df = df.sort(F.desc('date'))
    out.write_dataframe(df)

sort.

之前注意 df = df.coalesce(1)

问题。由于df.coalesce(1)df.repartition(1)都应该导致一个分区,我试图用[=18=替换df = df.coalesce(1) ].但后来结果似乎没有排序。为什么?

其他详细信息

如果我不干扰分区,结果也会显示为未排序:

物理计划使用 coalesce(1):

  +- *(3) Sort [date#6 DESC NULLS LAST], true, 0
     +- Coalesce 1
        +- Union
           :- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
           :  +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
           +- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
              +- *(2) Scan ExistingRDD[c1#14]

物理计划使用 repartition(1):

  +- *(3) Sort [date#6 DESC NULLS LAST], true, 0
     +- CustomShuffleReader coalesced
        +- ShuffleQueryStage 1
           +- Exchange rangepartitioning(date#6 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#85]
              +- ShuffleQueryStage 0
                 +- Exchange RoundRobinPartitioning(1), REPARTITION_WITH_NUM, [id=#83]
                    +- Union
                       :- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
                       :  +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
                       +- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
                          +- *(2) Scan ExistingRDD[c1#14]

我知道 这个问题,有人说他出于某种原因不能使用 coalesce(1)。就我而言,情况恰恰相反。

重新分区结果未排序的原因在您列出的查询计划中可见 - 它写出多个分区而不是一个。有两个 Exchange,第一个(较低)将数据带到单个分区,但第二个(计划中较高)对最多 200(*) 个分区执行 RangePartitioning,在该分区上进行排序。每个结果 partition/file 很可能已排序,但文件间的顺序未保持不变。

这似乎是 Spark 3.0.2 中的一个错误,Foundry 的 Spark 当前基于该版本。测试不同的 Spark 版本,我看到这种情况发生在 3.0.2 上:

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.2
      /_/

>>> df = spark.createDataFrame([{"a": 3*i} for i in range(10000)])
>>> sorted = df.repartition(1).sort("a")
>>> sorted.explain()
== Physical Plan ==
*(2) Sort [a#0L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#0L ASC NULLS FIRST, 200), true, [id=#15]
   +- Exchange RoundRobinPartitioning(1), false, [id=#14]
      +- *(1) Scan ExistingRDD[a#0L]

但不是在 3.2.0 上(禁用 AQE 只是为了匹配 3.0.2,它不会影响结果):

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/

>>> df = spark.createDataFrame([{"a": 3*i} for i in range(10000)])
>>> sorted = df.repartition(1).sort("a")
>>> sorted.explain()
== Physical Plan ==
*(2) Sort [a#0L ASC NULLS FIRST], true, 0
+- Exchange SinglePartition, REPARTITION_BY_NUM, [id=#12]
   +- *(1) Scan ExistingRDD[a#0L]

请注意 3.2.0 如何将初始 Exchange 显示为 SinglePartition 而不是 RoundRobinPartitioning(1),并基于此能够跳过排序所需的范围分区。