Window 当我使用 Order By (PySpark) 时,函数未按预期运行
Window function acts not as expected when I use Order By (PySpark)
所以我已经阅读了 this comprehensive material 但我不明白为什么 Window 函数会这样。
这是一个小例子:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
columns = ["CATEGORY", "REVENUE"]
data = [("Cell Phone", "6000"),
("Tablet", "1500"),
("Tablet", "5500"),
("Cell Phone", "5000"),
("Cell Phone", "6000"),
("Tablet", "2500"),
("Cell Phone", "3000"),
("Cell Phone", "3000"),
("Tablet", "3000"),
("Tablet", "4500"),
("Tablet", "6500")]
df = spark.createDataFrame(data=data, schema=columns)
window_spec = Window.partitionBy(df['CATEGORY']).orderBy(df['REVENUE'])
revenue_difference = F.max(df['REVENUE']).over(window_spec)
df.select(
df['CATEGORY'],
df['REVENUE'],
revenue_difference.alias("revenue_difference")).show()
所以当我写 orderBy(df['REVENUE'])
时,我得到这个:
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
|Cell Phone| 3000| 3000|
|Cell Phone| 3000| 3000|
|Cell Phone| 5000| 5000|
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
| Tablet| 1500| 1500|
| Tablet| 2500| 2500|
| Tablet| 3000| 3000|
| Tablet| 4500| 4500|
| Tablet| 5500| 5500|
| Tablet| 6500| 6500|
+----------+-------+------------------+
但是当我写 orderBy(df['REVENUE']).desc()
时,我得到这个:
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
|Cell Phone| 5000| 6000|
|Cell Phone| 3000| 6000|
|Cell Phone| 3000| 6000|
| Tablet| 6500| 6500|
| Tablet| 5500| 6500|
| Tablet| 4500| 6500|
| Tablet| 3000| 6500|
| Tablet| 2500| 6500|
| Tablet| 1500| 6500|
+----------+-------+------------------+
我不明白,因为在我看来,无论顺序如何,每个 window 中的 MAX 值都保持不变。那么有人可以向我解释一下我在这里没有得到什么吗?
谢谢!
原因很简单,默认的window range/row spec是Window.UnboundedPreceding
到Window.CurrentRow
,意思是从那个分区的第一行取最大值到当前行,而不是分区的最后一行。
这是一个常见问题。 (您可以将 .max()
替换为 sum()
并查看您得到的输出。它也会根据您对分区的排序方式而变化。)
要解决这个问题,您可以指定您希望始终使用完整 window 分区计算每个分区的最大值,如下所示:
window_spec = Window.partitionBy(df['CATEGORY']).orderBy(df['REVENUE']).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
revenue_difference = F.max(df['REVENUE']).over(window_spec)
df.select(
df['CATEGORY'],
df['REVENUE'],
revenue_difference.alias("revenue_difference")).show()
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
| Tablet| 6500| 6500|
| Tablet| 5500| 6500|
| Tablet| 4500| 6500|
| Tablet| 3000| 6500|
| Tablet| 2500| 6500|
| Tablet| 1500| 6500|
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
|Cell Phone| 5000| 6000|
|Cell Phone| 3000| 6000|
|Cell Phone| 3000| 6000|
+----------+-------+------------------+
所以我已经阅读了 this comprehensive material 但我不明白为什么 Window 函数会这样。
这是一个小例子:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
columns = ["CATEGORY", "REVENUE"]
data = [("Cell Phone", "6000"),
("Tablet", "1500"),
("Tablet", "5500"),
("Cell Phone", "5000"),
("Cell Phone", "6000"),
("Tablet", "2500"),
("Cell Phone", "3000"),
("Cell Phone", "3000"),
("Tablet", "3000"),
("Tablet", "4500"),
("Tablet", "6500")]
df = spark.createDataFrame(data=data, schema=columns)
window_spec = Window.partitionBy(df['CATEGORY']).orderBy(df['REVENUE'])
revenue_difference = F.max(df['REVENUE']).over(window_spec)
df.select(
df['CATEGORY'],
df['REVENUE'],
revenue_difference.alias("revenue_difference")).show()
所以当我写 orderBy(df['REVENUE'])
时,我得到这个:
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
|Cell Phone| 3000| 3000|
|Cell Phone| 3000| 3000|
|Cell Phone| 5000| 5000|
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
| Tablet| 1500| 1500|
| Tablet| 2500| 2500|
| Tablet| 3000| 3000|
| Tablet| 4500| 4500|
| Tablet| 5500| 5500|
| Tablet| 6500| 6500|
+----------+-------+------------------+
但是当我写 orderBy(df['REVENUE']).desc()
时,我得到这个:
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
|Cell Phone| 5000| 6000|
|Cell Phone| 3000| 6000|
|Cell Phone| 3000| 6000|
| Tablet| 6500| 6500|
| Tablet| 5500| 6500|
| Tablet| 4500| 6500|
| Tablet| 3000| 6500|
| Tablet| 2500| 6500|
| Tablet| 1500| 6500|
+----------+-------+------------------+
我不明白,因为在我看来,无论顺序如何,每个 window 中的 MAX 值都保持不变。那么有人可以向我解释一下我在这里没有得到什么吗?
谢谢!
原因很简单,默认的window range/row spec是Window.UnboundedPreceding
到Window.CurrentRow
,意思是从那个分区的第一行取最大值到当前行,而不是分区的最后一行。
这是一个常见问题。 (您可以将 .max()
替换为 sum()
并查看您得到的输出。它也会根据您对分区的排序方式而变化。)
要解决这个问题,您可以指定您希望始终使用完整 window 分区计算每个分区的最大值,如下所示:
window_spec = Window.partitionBy(df['CATEGORY']).orderBy(df['REVENUE']).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
revenue_difference = F.max(df['REVENUE']).over(window_spec)
df.select(
df['CATEGORY'],
df['REVENUE'],
revenue_difference.alias("revenue_difference")).show()
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
| Tablet| 6500| 6500|
| Tablet| 5500| 6500|
| Tablet| 4500| 6500|
| Tablet| 3000| 6500|
| Tablet| 2500| 6500|
| Tablet| 1500| 6500|
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
|Cell Phone| 5000| 6000|
|Cell Phone| 3000| 6000|
|Cell Phone| 3000| 6000|
+----------+-------+------------------+