Spark window 函数中的条件

Conditions in Spark window function

我有一个像

这样的数据框
+---+---+---+---+
|  q|  w|  e|  r|
+---+---+---+---+
|  a|  1| 20|  y|
|  a|  2| 22|  z|
|  b|  3| 10|  y|
|  b|  4| 12|  y|
+---+---+---+---+

我想用最小 er = z 标记行。如果没有 r = z 的行,我想要具有最小 e 的行,即使 r = y。 本质上,像

+---+---+---+---+---+
|  q|  w|  e|  r|  t|
+---+---+---+---+---+
|  a|  1| 20|  y|  0|
|  a|  2| 22|  z|  1|
|  b|  3| 10|  y|  1|
|  b|  4| 12|  y|  0|
+---+---+---+---+---+

我可以使用多个连接来完成,但那样太昂贵了。 所以我一直在寻找基于 window 的解决方案。

您可以为具有 r = z 的行计算一次每组的最小值,然后为组内的所有行计算一次。然后可以将第一个非空值与 e:

进行比较
from pyspark.sql import functions as F
from pyspark.sql import Window

df = ...

w = Window.partitionBy("q")
#When ordering is not defined, an unbounded window frame is used by default.

df.withColumn("min_e_with_r_eq_z", F.expr("min(case when r='z' then e else null end)").over(w)) \
    .withColumn("min_e_overall", F.min("e").over(w)) \
    .withColumn("t", F.coalesce("min_e_with_r_eq_z","min_e_overall") == F.col("e")) \
    .orderBy("w") \
    .show()

输出:

+---+---+---+---+-----------------+-------------+-----+
|  q|  w|  e|  r|min_e_with_r_eq_z|min_e_overall|    t|
+---+---+---+---+-----------------+-------------+-----+
|  a|  1| 20|  y|               22|           20|false|
|  a|  2| 22|  z|               22|           20| true|
|  b|  3| 10|  y|             null|           10| true|
|  b|  4| 12|  y|             null|           10|false|
+---+---+---+---+-----------------+-------------+-----+

注意:我假设q是window的分组列。

您可以根据 r = z 和列 e 的值分配行号:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    't', 
     F.when(
        F.row_number().over(
            Window.partitionBy('q')
                  .orderBy((F.col('r') == 'z').desc(), 'e')
        ) == 1, 
        1
    ).otherwise(0)
)

df2.show()
+---+---+---+---+---+
|  q|  w|  e|  r|  t|
+---+---+---+---+---+
|  a|  2| 22|  z|  1|
|  a|  1| 20|  y|  0|
|  b|  3| 10|  y|  1|
|  b|  4| 12|  y|  0|
+---+---+---+---+---+

添加@werner 接受的答案的 spark-scala 版本

val w = Window.partitionBy("q")

df.withColumn("min_e_with_r_eq_z", min(when($"r" === "z", $"e").otherwise(null)).over(w))
  .withColumn("min_e_overall", min("e").over(w))
  .withColumn("t", coalesce($"min_e_with_r_eq_z", $"min_e_overall") === $"e")
  .orderBy("w")
  .show()