合并(1)与重新分区(1)后的不同排序结果
Different sort results after coalesce(1) vs repartition(1)
我有以下脚本 returns 正确排序的结果:
from transforms.api import transform, Output
from pyspark.sql import functions as F
@transform(
out=Output("ri.foundry.main.dataset.29fdbff7-168a-457d-bb79-8f7508cede9d"),
)
def compute(out, ctx):
data = [("1", "2022-02-01", "older"),
("1", "2022-02-12", "older"),
("1", "2022-02-09", "new")]
df_inp = (
ctx.spark_session.createDataFrame(data, ["c1", "date", "record_status"])
.withColumn("date", F.to_date("date"))
.withColumn("record_status", F.lit("older"))
)
df_upd = (
ctx.spark_session.createDataFrame([('1',)], ['c1'])
.withColumn('date', F.to_date(F.lit('2022-02-17')))
.withColumn('record_status', F.lit('new'))
)
df = df_inp.unionByName(df_upd)
df = df.coalesce(1)
df = df.sort(F.desc('date'))
out.write_dataframe(df)
在 sort
.
之前注意 df = df.coalesce(1)
问题。由于df.coalesce(1)
和df.repartition(1)
都应该导致一个分区,我试图用[=18=替换df = df.coalesce(1)
].但后来结果似乎没有排序。为什么?
其他详细信息
如果我不干扰分区,结果也会显示为未排序:
物理计划使用 coalesce(1)
:
+- *(3) Sort [date#6 DESC NULLS LAST], true, 0
+- Coalesce 1
+- Union
:- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
: +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
+- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
+- *(2) Scan ExistingRDD[c1#14]
物理计划使用 repartition(1)
:
+- *(3) Sort [date#6 DESC NULLS LAST], true, 0
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 1
+- Exchange rangepartitioning(date#6 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#85]
+- ShuffleQueryStage 0
+- Exchange RoundRobinPartitioning(1), REPARTITION_WITH_NUM, [id=#83]
+- Union
:- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
: +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
+- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
+- *(2) Scan ExistingRDD[c1#14]
我知道 这个问题,有人说他出于某种原因不能使用 coalesce(1)
。就我而言,情况恰恰相反。
重新分区结果未排序的原因在您列出的查询计划中可见 - 它写出多个分区而不是一个。有两个 Exchange,第一个(较低)将数据带到单个分区,但第二个(计划中较高)对最多 200(*) 个分区执行 RangePartitioning,在该分区上进行排序。每个结果 partition/file 很可能已排序,但文件间的顺序未保持不变。
这似乎是 Spark 3.0.2 中的一个错误,Foundry 的 Spark 当前基于该版本。测试不同的 Spark 版本,我看到这种情况发生在 3.0.2 上:
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.2
/_/
>>> df = spark.createDataFrame([{"a": 3*i} for i in range(10000)])
>>> sorted = df.repartition(1).sort("a")
>>> sorted.explain()
== Physical Plan ==
*(2) Sort [a#0L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#0L ASC NULLS FIRST, 200), true, [id=#15]
+- Exchange RoundRobinPartitioning(1), false, [id=#14]
+- *(1) Scan ExistingRDD[a#0L]
但不是在 3.2.0 上(禁用 AQE 只是为了匹配 3.0.2,它不会影响结果):
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.2.0
/_/
>>> df = spark.createDataFrame([{"a": 3*i} for i in range(10000)])
>>> sorted = df.repartition(1).sort("a")
>>> sorted.explain()
== Physical Plan ==
*(2) Sort [a#0L ASC NULLS FIRST], true, 0
+- Exchange SinglePartition, REPARTITION_BY_NUM, [id=#12]
+- *(1) Scan ExistingRDD[a#0L]
请注意 3.2.0 如何将初始 Exchange 显示为 SinglePartition 而不是 RoundRobinPartitioning(1),并基于此能够跳过排序所需的范围分区。
我有以下脚本 returns 正确排序的结果:
from transforms.api import transform, Output
from pyspark.sql import functions as F
@transform(
out=Output("ri.foundry.main.dataset.29fdbff7-168a-457d-bb79-8f7508cede9d"),
)
def compute(out, ctx):
data = [("1", "2022-02-01", "older"),
("1", "2022-02-12", "older"),
("1", "2022-02-09", "new")]
df_inp = (
ctx.spark_session.createDataFrame(data, ["c1", "date", "record_status"])
.withColumn("date", F.to_date("date"))
.withColumn("record_status", F.lit("older"))
)
df_upd = (
ctx.spark_session.createDataFrame([('1',)], ['c1'])
.withColumn('date', F.to_date(F.lit('2022-02-17')))
.withColumn('record_status', F.lit('new'))
)
df = df_inp.unionByName(df_upd)
df = df.coalesce(1)
df = df.sort(F.desc('date'))
out.write_dataframe(df)
在 sort
.
df = df.coalesce(1)
问题。由于df.coalesce(1)
和df.repartition(1)
都应该导致一个分区,我试图用[=18=替换df = df.coalesce(1)
].但后来结果似乎没有排序。为什么?
其他详细信息
如果我不干扰分区,结果也会显示为未排序:
物理计划使用 coalesce(1)
:
+- *(3) Sort [date#6 DESC NULLS LAST], true, 0
+- Coalesce 1
+- Union
:- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
: +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
+- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
+- *(2) Scan ExistingRDD[c1#14]
物理计划使用 repartition(1)
:
+- *(3) Sort [date#6 DESC NULLS LAST], true, 0
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 1
+- Exchange rangepartitioning(date#6 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#85]
+- ShuffleQueryStage 0
+- Exchange RoundRobinPartitioning(1), REPARTITION_WITH_NUM, [id=#83]
+- Union
:- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
: +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
+- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
+- *(2) Scan ExistingRDD[c1#14]
我知道 coalesce(1)
。就我而言,情况恰恰相反。
重新分区结果未排序的原因在您列出的查询计划中可见 - 它写出多个分区而不是一个。有两个 Exchange,第一个(较低)将数据带到单个分区,但第二个(计划中较高)对最多 200(*) 个分区执行 RangePartitioning,在该分区上进行排序。每个结果 partition/file 很可能已排序,但文件间的顺序未保持不变。
这似乎是 Spark 3.0.2 中的一个错误,Foundry 的 Spark 当前基于该版本。测试不同的 Spark 版本,我看到这种情况发生在 3.0.2 上:
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.2
/_/
>>> df = spark.createDataFrame([{"a": 3*i} for i in range(10000)])
>>> sorted = df.repartition(1).sort("a")
>>> sorted.explain()
== Physical Plan ==
*(2) Sort [a#0L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#0L ASC NULLS FIRST, 200), true, [id=#15]
+- Exchange RoundRobinPartitioning(1), false, [id=#14]
+- *(1) Scan ExistingRDD[a#0L]
但不是在 3.2.0 上(禁用 AQE 只是为了匹配 3.0.2,它不会影响结果):
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.2.0
/_/
>>> df = spark.createDataFrame([{"a": 3*i} for i in range(10000)])
>>> sorted = df.repartition(1).sort("a")
>>> sorted.explain()
== Physical Plan ==
*(2) Sort [a#0L ASC NULLS FIRST], true, 0
+- Exchange SinglePartition, REPARTITION_BY_NUM, [id=#12]
+- *(1) Scan ExistingRDD[a#0L]
请注意 3.2.0 如何将初始 Exchange 显示为 SinglePartition 而不是 RoundRobinPartitioning(1),并基于此能够跳过排序所需的范围分区。