在 PysPark 中合并重叠区间
Merge Overlapping Intervals in PysPark
我有一个像这样的 DataFrame(但更大):
id start end
0 10 20
1 11 13
2 14 18
3 22 30
4 25 27
5 28 31
我正在尝试有效合并 PySpark 中的重叠间隔,同时保存在新列 'ids' 中,合并了哪些间隔,因此它看起来像这样:
start end ids
10 20 [0,1,2]
22 31 [3,4,5]
可视化:
来自:
至:
我可以在不使用 udf 的情况下执行此操作吗?
编辑:id和start顺序不一定相同
您可以使用 window 函数将前一行与当前行进行比较,构建一个列来确定当前行是否是新间隔的开始,然后对该列求和以构建间隔 ID。然后你按这个间隔 id 分组以获得你的最终数据帧。
如果你调用input_df
你的输入数据帧,代码如下:
from pyspark.sql import Window
from pyspark.sql import functions as F
all_previous_rows_window = Window \
.orderBy('start') \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
result = input_df \
.withColumn('max_previous_end', F.max('end').over(all_previous_rows_window)) \
.withColumn('interval_change', F.when(
F.col('start') > F.lag('max_previous_end').over(Window.orderBy('start')),
F.lit(1)
).otherwise(F.lit(0))) \
.withColumn('interval_id', F.sum('interval_change').over(all_previous_rows_window)) \
.drop('interval_change', 'max_previous_end') \
.groupBy('interval_id') \
.agg(
F.collect_list('id').alias('ids'),
F.min('start').alias('start'),
F.max('end').alias('end')
).drop('interval_id')
因此您可以在没有任何用户定义函数的情况下合并您的区间。然而,每次我们使用 window 时,代码只在一个执行器上执行,因为我们的 windows 没有分区。
我有一个像这样的 DataFrame(但更大):
id start end
0 10 20
1 11 13
2 14 18
3 22 30
4 25 27
5 28 31
我正在尝试有效合并 PySpark 中的重叠间隔,同时保存在新列 'ids' 中,合并了哪些间隔,因此它看起来像这样:
start end ids
10 20 [0,1,2]
22 31 [3,4,5]
可视化:
来自:
至:
我可以在不使用 udf 的情况下执行此操作吗?
编辑:id和start顺序不一定相同
您可以使用 window 函数将前一行与当前行进行比较,构建一个列来确定当前行是否是新间隔的开始,然后对该列求和以构建间隔 ID。然后你按这个间隔 id 分组以获得你的最终数据帧。
如果你调用input_df
你的输入数据帧,代码如下:
from pyspark.sql import Window
from pyspark.sql import functions as F
all_previous_rows_window = Window \
.orderBy('start') \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
result = input_df \
.withColumn('max_previous_end', F.max('end').over(all_previous_rows_window)) \
.withColumn('interval_change', F.when(
F.col('start') > F.lag('max_previous_end').over(Window.orderBy('start')),
F.lit(1)
).otherwise(F.lit(0))) \
.withColumn('interval_id', F.sum('interval_change').over(all_previous_rows_window)) \
.drop('interval_change', 'max_previous_end') \
.groupBy('interval_id') \
.agg(
F.collect_list('id').alias('ids'),
F.min('start').alias('start'),
F.max('end').alias('end')
).drop('interval_id')
因此您可以在没有任何用户定义函数的情况下合并您的区间。然而,每次我们使用 window 时,代码只在一个执行器上执行,因为我们的 windows 没有分区。