Spark "first" Window 函数花费的时间比 "last" 长得多
Spark "first" Window function is taking much longer than "last"
我正在开发一个 pyspark 例程来插入配置中的缺失值 table。
想象一下 table 从 0 到 50,000 的配置值。用户在两者之间指定几个数据点(比如 0、50、100、500、2000、500000),我们对余数进行插值。我的解决方案主要遵循 this blog post 非常接近,除了我没有使用任何 UDF。
在对此性能进行故障排除(大约需要 3 分钟)时,我发现一个特定的 window 函数占用了所有时间,而我所做的其他所有事情只需要几秒钟。
这是主要的兴趣区域 - 我在其中使用 window 函数来填写上一个和下一个 user-supplied 配置值:
from pyspark.sql import Window, functions as F
# Create partition windows that are required to generate new rows from the ones provided
win_last = Window.partitionBy('PORT_TYPE', 'loss_process').orderBy('rank').rowsBetween(Window.unboundedPreceding, 0)
win_next = Window.partitionBy('PORT_TYPE', 'loss_process').orderBy('rank').rowsBetween(0, Window.unboundedFollowing)
# Join back in the provided config table to populate the "known" scale factors
df_part1 = (df_scale_factors_template
.join(df_users_config, ['PORT_TYPE', 'loss_process', 'rank'], 'leftouter')
# Add computed columns that can lookup the prior config and next config for each missing value
.withColumn('last_rank', F.last( F.col('rank'), ignorenulls=True).over(win_last))
.withColumn('last_sf', F.last( F.col('scale_factor'), ignorenulls=True).over(win_last))
).cache()
debug_log_dataframe(df_part1 , 'df_part1') # Force a .count() and time Part1
df_part2 = (df_part1
.withColumn('next_rank', F.first(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.first(F.col('scale_factor'), ignorenulls=True).over(win_next))
).cache()
debug_log_dataframe(df_part2 , 'df_part2') # Force a .count() and time Part2
df_part3 = (df_part2
# Implements standard linear interpolation: y = y1 + ((y2-y1)/(x2-x1)) * (x-x1)
.withColumn('scale_factor',
F.when(F.col('last_rank')==F.col('next_rank'), F.col('last_sf')) # Handle div/0 case
.otherwise(F.col('last_sf') + ((F.col('next_sf')-F.col('last_sf'))/(F.col('next_rank')-F.col('last_rank'))) * (F.col('rank')-F.col('last_rank'))))
.select('PORT_TYPE', 'loss_process', 'rank', 'scale_factor')
).cache()
debug_log_dataframe(df_part3, 'df_part3', explain: True) # Force a .count() and time Part3
上面曾经是一个单链数据帧语句,但我已经将它分成 3 个部分,这样我就可以隔离花费这么长时间的部分。结果是:
Part 1: Generated 8 columns and 300006 rows in 0.65 seconds
Part 2: Generated 10 columns and 300006 rows in 189.55 seconds
Part 3: Generated 4 columns and 300006 rows in 0.24 seconds
为什么我在 Window.unboundedFollowing
上调用 first()
比在 Window.unboundedPreceding
上调用 last()
花费的时间长得多?
一些避免问题/疑虑的注意事项:
debug_log_dataframe
只是一个辅助函数,用 .Count()
强制数据帧 execution/cache 并计时以产生上述日志。
- 我们实际上同时对 50001 行的 6 个配置 table 进行操作(因此分区和行计数)
- 作为健全性检查,我已经排除了
cache()
重用的影响,方法是在为后续运行计时之前显式 unpersist()
ing - 我对上述测量非常有信心。
物理计划:
为了帮助回答这个问题,我对第 3 部分的结果调用 explain()
以确认缓存正在产生预期效果等。这里注释以突出问题区域:
我能看到的唯一区别是:
- 前两个调用(对
last
)显示 RunningWindowFunction
,而对 next
的调用只显示 Window
- 第 1 部分旁边有 *(3),但第 2 部分没有。
我试过的一些东西:
- 我尝试将第 2 部分进一步拆分为单独的数据帧 - 结果是每个
first
语句占用总时间的一半(~98 秒)
- 我尝试颠倒生成这些列的顺序(例如,在调用 'first' 之后调用 'last'),但没有区别。无论哪个数据帧最终包含对
first
的调用都是最慢的。
我觉得我已经尽可能多地挖掘了,我有点希望 spark 专家看一看就知道这个时间是从哪里来的。
没有回答问题的解决方案
在尝试各种方法来加快我的日常工作时,我想到了尝试重写我对 first()
的用法,使其只是具有相反排序顺序的 last()
的用法。
所以重写这个:
win_next = (Window.partitionBy('PORT_TYPE', 'loss_process')
.orderBy('rank').rowsBetween(0, Window.unboundedFollowing))
df_part2 = (df_part1
.withColumn('next_rank', F.first(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.first(F.col('scale_factor'), ignorenulls=True).over(win_next))
)
像这样:
win_next = (Window.partitionBy('PORT_TYPE', 'loss_process')
.orderBy(F.desc('rank')).rowsBetween(Window.unboundedPreceding, 0))
df_part2 = (df_part1
.withColumn('next_rank', F.last(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.last(F.col('scale_factor'), ignorenulls=True).over(win_next))
)
令我惊讶的是,这实际上解决了性能问题,现在整个数据帧仅需 3 秒即可生成。我很高兴,但仍然很烦恼。
正如我预测的那样,查询计划现在包括一个新的 SORT 步骤,然后再创建接下来的两列,并且前两列从 Window
更改为 RunningWindowFunction
。这是新计划(不再将代码分解为 3 个单独的缓存部分,因为那只是为了解决性能问题):
关于问题:
Why do my calls to first() over Window.unboundedFollowing take so much longer than last() over Window.unboundedPreceding?
出于学术原因,我希望有人仍然可以回答这个问题
我正在开发一个 pyspark 例程来插入配置中的缺失值 table。
想象一下 table 从 0 到 50,000 的配置值。用户在两者之间指定几个数据点(比如 0、50、100、500、2000、500000),我们对余数进行插值。我的解决方案主要遵循 this blog post 非常接近,除了我没有使用任何 UDF。
在对此性能进行故障排除(大约需要 3 分钟)时,我发现一个特定的 window 函数占用了所有时间,而我所做的其他所有事情只需要几秒钟。
这是主要的兴趣区域 - 我在其中使用 window 函数来填写上一个和下一个 user-supplied 配置值:
from pyspark.sql import Window, functions as F
# Create partition windows that are required to generate new rows from the ones provided
win_last = Window.partitionBy('PORT_TYPE', 'loss_process').orderBy('rank').rowsBetween(Window.unboundedPreceding, 0)
win_next = Window.partitionBy('PORT_TYPE', 'loss_process').orderBy('rank').rowsBetween(0, Window.unboundedFollowing)
# Join back in the provided config table to populate the "known" scale factors
df_part1 = (df_scale_factors_template
.join(df_users_config, ['PORT_TYPE', 'loss_process', 'rank'], 'leftouter')
# Add computed columns that can lookup the prior config and next config for each missing value
.withColumn('last_rank', F.last( F.col('rank'), ignorenulls=True).over(win_last))
.withColumn('last_sf', F.last( F.col('scale_factor'), ignorenulls=True).over(win_last))
).cache()
debug_log_dataframe(df_part1 , 'df_part1') # Force a .count() and time Part1
df_part2 = (df_part1
.withColumn('next_rank', F.first(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.first(F.col('scale_factor'), ignorenulls=True).over(win_next))
).cache()
debug_log_dataframe(df_part2 , 'df_part2') # Force a .count() and time Part2
df_part3 = (df_part2
# Implements standard linear interpolation: y = y1 + ((y2-y1)/(x2-x1)) * (x-x1)
.withColumn('scale_factor',
F.when(F.col('last_rank')==F.col('next_rank'), F.col('last_sf')) # Handle div/0 case
.otherwise(F.col('last_sf') + ((F.col('next_sf')-F.col('last_sf'))/(F.col('next_rank')-F.col('last_rank'))) * (F.col('rank')-F.col('last_rank'))))
.select('PORT_TYPE', 'loss_process', 'rank', 'scale_factor')
).cache()
debug_log_dataframe(df_part3, 'df_part3', explain: True) # Force a .count() and time Part3
上面曾经是一个单链数据帧语句,但我已经将它分成 3 个部分,这样我就可以隔离花费这么长时间的部分。结果是:
Part 1: Generated 8 columns and 300006 rows in 0.65 seconds
Part 2: Generated 10 columns and 300006 rows in 189.55 seconds
Part 3: Generated 4 columns and 300006 rows in 0.24 seconds
为什么我在 Window.unboundedFollowing
上调用 first()
比在 Window.unboundedPreceding
上调用 last()
花费的时间长得多?
一些避免问题/疑虑的注意事项:
debug_log_dataframe
只是一个辅助函数,用.Count()
强制数据帧 execution/cache 并计时以产生上述日志。- 我们实际上同时对 50001 行的 6 个配置 table 进行操作(因此分区和行计数)
- 作为健全性检查,我已经排除了
cache()
重用的影响,方法是在为后续运行计时之前显式unpersist()
ing - 我对上述测量非常有信心。
物理计划:
为了帮助回答这个问题,我对第 3 部分的结果调用 explain()
以确认缓存正在产生预期效果等。这里注释以突出问题区域:
我能看到的唯一区别是:
- 前两个调用(对
last
)显示RunningWindowFunction
,而对next
的调用只显示Window
- 第 1 部分旁边有 *(3),但第 2 部分没有。
我试过的一些东西:
- 我尝试将第 2 部分进一步拆分为单独的数据帧 - 结果是每个
first
语句占用总时间的一半(~98 秒) - 我尝试颠倒生成这些列的顺序(例如,在调用 'first' 之后调用 'last'),但没有区别。无论哪个数据帧最终包含对
first
的调用都是最慢的。
我觉得我已经尽可能多地挖掘了,我有点希望 spark 专家看一看就知道这个时间是从哪里来的。
没有回答问题的解决方案
在尝试各种方法来加快我的日常工作时,我想到了尝试重写我对 first()
的用法,使其只是具有相反排序顺序的 last()
的用法。
所以重写这个:
win_next = (Window.partitionBy('PORT_TYPE', 'loss_process')
.orderBy('rank').rowsBetween(0, Window.unboundedFollowing))
df_part2 = (df_part1
.withColumn('next_rank', F.first(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.first(F.col('scale_factor'), ignorenulls=True).over(win_next))
)
像这样:
win_next = (Window.partitionBy('PORT_TYPE', 'loss_process')
.orderBy(F.desc('rank')).rowsBetween(Window.unboundedPreceding, 0))
df_part2 = (df_part1
.withColumn('next_rank', F.last(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.last(F.col('scale_factor'), ignorenulls=True).over(win_next))
)
令我惊讶的是,这实际上解决了性能问题,现在整个数据帧仅需 3 秒即可生成。我很高兴,但仍然很烦恼。
正如我预测的那样,查询计划现在包括一个新的 SORT 步骤,然后再创建接下来的两列,并且前两列从 Window
更改为 RunningWindowFunction
。这是新计划(不再将代码分解为 3 个单独的缓存部分,因为那只是为了解决性能问题):
关于问题:
Why do my calls to first() over Window.unboundedFollowing take so much longer than last() over Window.unboundedPreceding?
出于学术原因,我希望有人仍然可以回答这个问题