Spark window 函数中的条件
Conditions in Spark window function
我有一个像
这样的数据框
+---+---+---+---+
| q| w| e| r|
+---+---+---+---+
| a| 1| 20| y|
| a| 2| 22| z|
| b| 3| 10| y|
| b| 4| 12| y|
+---+---+---+---+
我想用最小 e
和 r = z
标记行。如果没有 r = z
的行,我想要具有最小 e
的行,即使 r = y
。
本质上,像
+---+---+---+---+---+
| q| w| e| r| t|
+---+---+---+---+---+
| a| 1| 20| y| 0|
| a| 2| 22| z| 1|
| b| 3| 10| y| 1|
| b| 4| 12| y| 0|
+---+---+---+---+---+
我可以使用多个连接来完成,但那样太昂贵了。
所以我一直在寻找基于 window 的解决方案。
您可以为具有 r = z
的行计算一次每组的最小值,然后为组内的所有行计算一次。然后可以将第一个非空值与 e
:
进行比较
from pyspark.sql import functions as F
from pyspark.sql import Window
df = ...
w = Window.partitionBy("q")
#When ordering is not defined, an unbounded window frame is used by default.
df.withColumn("min_e_with_r_eq_z", F.expr("min(case when r='z' then e else null end)").over(w)) \
.withColumn("min_e_overall", F.min("e").over(w)) \
.withColumn("t", F.coalesce("min_e_with_r_eq_z","min_e_overall") == F.col("e")) \
.orderBy("w") \
.show()
输出:
+---+---+---+---+-----------------+-------------+-----+
| q| w| e| r|min_e_with_r_eq_z|min_e_overall| t|
+---+---+---+---+-----------------+-------------+-----+
| a| 1| 20| y| 22| 20|false|
| a| 2| 22| z| 22| 20| true|
| b| 3| 10| y| null| 10| true|
| b| 4| 12| y| null| 10|false|
+---+---+---+---+-----------------+-------------+-----+
注意:我假设q
是window的分组列。
您可以根据 r = z
和列 e
的值分配行号:
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
't',
F.when(
F.row_number().over(
Window.partitionBy('q')
.orderBy((F.col('r') == 'z').desc(), 'e')
) == 1,
1
).otherwise(0)
)
df2.show()
+---+---+---+---+---+
| q| w| e| r| t|
+---+---+---+---+---+
| a| 2| 22| z| 1|
| a| 1| 20| y| 0|
| b| 3| 10| y| 1|
| b| 4| 12| y| 0|
+---+---+---+---+---+
添加@werner 接受的答案的 spark-scala 版本
val w = Window.partitionBy("q")
df.withColumn("min_e_with_r_eq_z", min(when($"r" === "z", $"e").otherwise(null)).over(w))
.withColumn("min_e_overall", min("e").over(w))
.withColumn("t", coalesce($"min_e_with_r_eq_z", $"min_e_overall") === $"e")
.orderBy("w")
.show()
我有一个像
这样的数据框+---+---+---+---+
| q| w| e| r|
+---+---+---+---+
| a| 1| 20| y|
| a| 2| 22| z|
| b| 3| 10| y|
| b| 4| 12| y|
+---+---+---+---+
我想用最小 e
和 r = z
标记行。如果没有 r = z
的行,我想要具有最小 e
的行,即使 r = y
。
本质上,像
+---+---+---+---+---+
| q| w| e| r| t|
+---+---+---+---+---+
| a| 1| 20| y| 0|
| a| 2| 22| z| 1|
| b| 3| 10| y| 1|
| b| 4| 12| y| 0|
+---+---+---+---+---+
我可以使用多个连接来完成,但那样太昂贵了。 所以我一直在寻找基于 window 的解决方案。
您可以为具有 r = z
的行计算一次每组的最小值,然后为组内的所有行计算一次。然后可以将第一个非空值与 e
:
from pyspark.sql import functions as F
from pyspark.sql import Window
df = ...
w = Window.partitionBy("q")
#When ordering is not defined, an unbounded window frame is used by default.
df.withColumn("min_e_with_r_eq_z", F.expr("min(case when r='z' then e else null end)").over(w)) \
.withColumn("min_e_overall", F.min("e").over(w)) \
.withColumn("t", F.coalesce("min_e_with_r_eq_z","min_e_overall") == F.col("e")) \
.orderBy("w") \
.show()
输出:
+---+---+---+---+-----------------+-------------+-----+
| q| w| e| r|min_e_with_r_eq_z|min_e_overall| t|
+---+---+---+---+-----------------+-------------+-----+
| a| 1| 20| y| 22| 20|false|
| a| 2| 22| z| 22| 20| true|
| b| 3| 10| y| null| 10| true|
| b| 4| 12| y| null| 10|false|
+---+---+---+---+-----------------+-------------+-----+
注意:我假设q
是window的分组列。
您可以根据 r = z
和列 e
的值分配行号:
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
't',
F.when(
F.row_number().over(
Window.partitionBy('q')
.orderBy((F.col('r') == 'z').desc(), 'e')
) == 1,
1
).otherwise(0)
)
df2.show()
+---+---+---+---+---+
| q| w| e| r| t|
+---+---+---+---+---+
| a| 2| 22| z| 1|
| a| 1| 20| y| 0|
| b| 3| 10| y| 1|
| b| 4| 12| y| 0|
+---+---+---+---+---+
添加@werner 接受的答案的 spark-scala 版本
val w = Window.partitionBy("q")
df.withColumn("min_e_with_r_eq_z", min(when($"r" === "z", $"e").otherwise(null)).over(w))
.withColumn("min_e_overall", min("e").over(w))
.withColumn("t", coalesce($"min_e_with_r_eq_z", $"min_e_overall") === $"e")
.orderBy("w")
.show()