orderBy 和 sort 不适用于完整的数据框

orderBy and sort is not applied on the full dataframe

最终结果按第 'timestamp' 列排序。我有两个脚本 不同 one 提供给列 'record_status' 的值( 'old' 对比 'older')。由于数据在列 'timestamp' 上排序,结果顺序应该是相同的。但是,顺序不同。看起来,在第一种情况下,排序是在联合 之前 执行的,而它位于联合之后。

使用 orderBy 而不是 sort 没有任何区别。

为什么会发生,如何预防? (我使用的是 Spark 3.0.2)

Script1(完整)- 4 次运行(构建)后的结果:

from transforms.api import transform, Output, incremental
from pyspark.sql import functions as F, types as T


@incremental(
    require_incremental=True,
)
@transform(
    out=Output("ri.foundry.main.dataset.a82be5aa-81f7-45cf-8c59-05912c8ed6c7"),
)
def compute(out, ctx):

    out_schema = T.StructType([
        T.StructField('c1', T.StringType()),
        T.StructField('timestamp', T.TimestampType()),
        T.StructField('record_status', T.StringType()),
    ])
    df_out = (
        out.dataframe('previous', out_schema)
        .withColumn('record_status', F.lit('older'))
    )

    df_upd = (
        ctx.spark_session.createDataFrame([('1',)], ['c1'])
        .withColumn('timestamp', F.current_timestamp())
        .withColumn('record_status', F.lit('new'))
    )

    df = df_out.unionByName(df_upd)
    df = df.sort('timestamp', ascending=False)

    out.set_mode('replace')
    out.write_dataframe(df)

Script2(完整)- 4 次运行(构建)后的结果:

from transforms.api import transform, Output, incremental
from pyspark.sql import functions as F, types as T


@incremental(
    require_incremental=True,
)
@transform(
    out=Output("ri.foundry.main.dataset.caee8f7a-64b0-4837-b4f3-d5a6d5dedd85"),
)
def compute(out, ctx):

    out_schema = T.StructType([
        T.StructField('c1', T.StringType()),
        T.StructField('timestamp', T.TimestampType()),
        T.StructField('record_status', T.StringType()),
    ])
    df_out = (
        out.dataframe('previous', out_schema)
        .withColumn('record_status', F.lit('old'))
    )

    df_upd = (
        ctx.spark_session.createDataFrame([('1',)], ['c1'])
        .withColumn('timestamp', F.current_timestamp())
        .withColumn('record_status', F.lit('new'))
    )

    df = df_out.unionByName(df_upd)
    df = df.sort('timestamp', ascending=False)

    out.set_mode('replace')
    out.write_dataframe(df)

两个转换中的查询计划 表明排序必须在并集之后执行(检查逻辑和物理计划我发现除了 ID 和 RID 没有区别,但是所有转换步骤在同一个地方):

观察:
使用以下配置文件排序效果很好(查询计划不变):

@configure(["KUBERNETES_NO_EXECUTORS_SMALL"])

事实证明,此行为不是由 @incremental 引起的。也可以在常规变换中观察到:

from transforms.api import transform, Output
from pyspark.sql import functions as F


@transform(
    out=Output("ri.foundry.main.dataset.beea7dd2-8da3-4abf-9103-464ec646dc00"),
)
def compute(out, ctx):

    data = [("1", "2022-02-16T17:48:15.653Z", "older"),
            ("1", "2022-02-16T17:46:58.054Z", "older"),
            ("1", "2022-02-16T17:50:50.850Z", "new")]
    df_inp = (
        ctx.spark_session.createDataFrame(data, ["c1", "timestamp", "record_status"])
        .withColumn("timestamp", F.to_timestamp("timestamp"))
        .withColumn("record_status", F.lit("older"))
    )
    df_upd = (
        ctx.spark_session.createDataFrame([('1',)], ['c1'])
        .withColumn('timestamp', F.current_timestamp())
        .withColumn('record_status', F.lit('new'))
    )

    df = df_inp.unionByName(df_upd)
    df = df.sort(F.desc('timestamp'))

    out.write_dataframe(df)

在问这个问题时,我提供了 2 个脚本:一个包含假定工作 sort,另一个 - 失败 sort。事实上,这两个脚本都不起作用,只是“正确”的脚本需要更多运行才能开始显示不正确的排序顺序:

原因在于输入dfs的分区。显然,sortgroupBy 仅在分区中执行排序(其中有几个)。由于某种原因,数据不会移动到一个执行程序或驱动程序。因此,生成的组合数据集没有统一的排序顺序。这就是为什么使用配置文件“KUBERNETES_NO_EXECUTORS_SMALL”产生了正确的排序顺序(所有操作都在一个节点 - 驱动程序中执行)。

我能找到的唯一解决方案 是在 df.sort() 行之前使用 df.coalesce

df = df_out.unionByName(df_upd)
df = df.coalesce(1)
df = df.sort(F.desc('timestamp'))

我也尝试过使用 df = df.repartition(1) 代替 df = df.coalesce(1),但是没有用(;事实证明,这是因为我们使用 Spark 3.0。 2, Spark 3.2.0中修复).