在 PysPark 中合并重叠区间

Merge Overlapping Intervals in PysPark

我有一个像这样的 DataFrame(但更大):

id   start    end
0    10       20
1    11       13
2    14       18
3    22       30
4    25       27
5    28       31

我正在尝试有效合并 PySpark 中的重叠间隔,同时保存在新列 'ids' 中,合并了哪些间隔,因此它看起来像这样:

start    end   ids
10       20    [0,1,2]
22       31    [3,4,5]

可视化:
来自:


至:

我可以在不使用 udf 的情况下执行此操作吗?

编辑:id和start顺序不一定相同

您可以使用 window 函数将前一行与当前行进行比较,构建一个列来确定当前行是否是新间隔的开始,然后对该列求和以构建间隔 ID。然后你按这个间隔 id 分组以获得你的最终数据帧。

如果你调用input_df你的输入数据帧,代码如下:

from pyspark.sql import Window
from pyspark.sql import functions as F

all_previous_rows_window = Window \
  .orderBy('start') \
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

result = input_df \
  .withColumn('max_previous_end', F.max('end').over(all_previous_rows_window)) \
  .withColumn('interval_change', F.when(
    F.col('start') > F.lag('max_previous_end').over(Window.orderBy('start')), 
    F.lit(1)
  ).otherwise(F.lit(0))) \
  .withColumn('interval_id', F.sum('interval_change').over(all_previous_rows_window)) \
  .drop('interval_change', 'max_previous_end') \
  .groupBy('interval_id') \
  .agg(
    F.collect_list('id').alias('ids'),
    F.min('start').alias('start'),
    F.max('end').alias('end')
  ).drop('interval_id')

因此您可以在没有任何用户定义函数的情况下合并您的区间。然而,每次我们使用 window 时,代码只在一个执行器上执行,因为我们的 windows 没有分区。