Window 当我使用 Order By (PySpark) 时,函数未按预期运行

Window function acts not as expected when I use Order By (PySpark)

所以我已经阅读了 this comprehensive material 但我不明白为什么 Window 函数会这样。

这是一个小例子:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

columns = ["CATEGORY", "REVENUE"]
data = [("Cell Phone", "6000"),
        ("Tablet", "1500"),
        ("Tablet", "5500"),
        ("Cell Phone", "5000"),
        ("Cell Phone", "6000"),
        ("Tablet", "2500"),
        ("Cell Phone", "3000"),
        ("Cell Phone", "3000"),
        ("Tablet", "3000"),
        ("Tablet", "4500"),
        ("Tablet", "6500")]

df = spark.createDataFrame(data=data, schema=columns)

window_spec = Window.partitionBy(df['CATEGORY']).orderBy(df['REVENUE'])

revenue_difference = F.max(df['REVENUE']).over(window_spec)

df.select(
  df['CATEGORY'],
  df['REVENUE'],
  revenue_difference.alias("revenue_difference")).show()

所以当我写 orderBy(df['REVENUE']) 时,我得到这个:

+----------+-------+------------------+
|  CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
|Cell Phone|   3000|              3000|
|Cell Phone|   3000|              3000|
|Cell Phone|   5000|              5000|
|Cell Phone|   6000|              6000|
|Cell Phone|   6000|              6000|
|    Tablet|   1500|              1500|
|    Tablet|   2500|              2500|
|    Tablet|   3000|              3000|
|    Tablet|   4500|              4500|
|    Tablet|   5500|              5500|
|    Tablet|   6500|              6500|
+----------+-------+------------------+

但是当我写 orderBy(df['REVENUE']).desc() 时,我得到这个:

+----------+-------+------------------+
|  CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
|Cell Phone|   6000|              6000|
|Cell Phone|   6000|              6000|
|Cell Phone|   5000|              6000|
|Cell Phone|   3000|              6000|
|Cell Phone|   3000|              6000|
|    Tablet|   6500|              6500|
|    Tablet|   5500|              6500|
|    Tablet|   4500|              6500|
|    Tablet|   3000|              6500|
|    Tablet|   2500|              6500|
|    Tablet|   1500|              6500|
+----------+-------+------------------+

我不明白,因为在我看来,无论顺序如何,每个 window 中的 MAX 值都保持不变。那么有人可以向我解释一下我在这里没有得到什么吗?

谢谢!

原因很简单,默认的window range/row spec是Window.UnboundedPrecedingWindow.CurrentRow,意思是从那个分区的第一行取最大值到当前行,而不是分区的最后一行。

这是一个常见问题。 (您可以将 .max() 替换为 sum() 并查看您得到的输出。它也会根据您对分区的排序方式而变化。)

要解决这个问题,您可以指定您希望始终使用完整 window 分区计算每个分区的最大值,如下所示:

window_spec = Window.partitionBy(df['CATEGORY']).orderBy(df['REVENUE']).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

revenue_difference = F.max(df['REVENUE']).over(window_spec)

df.select(
  df['CATEGORY'],
  df['REVENUE'],
  revenue_difference.alias("revenue_difference")).show()
+----------+-------+------------------+
|  CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
|    Tablet|   6500|              6500|
|    Tablet|   5500|              6500|
|    Tablet|   4500|              6500|
|    Tablet|   3000|              6500|
|    Tablet|   2500|              6500|
|    Tablet|   1500|              6500|
|Cell Phone|   6000|              6000|
|Cell Phone|   6000|              6000|
|Cell Phone|   5000|              6000|
|Cell Phone|   3000|              6000|
|Cell Phone|   3000|              6000|
+----------+-------+------------------+