如何在 PySpark 中使用 window 函数?
How to use window functions in PySpark?
我正在尝试对数据框使用一些 windows 函数(ntile
和 percentRank
),但我不知道如何使用它们。
谁能帮我解决这个问题?在 Python API documentation 中没有关于它的例子。
具体来说,我正在尝试获取数据框中数字字段的分位数。
我正在使用 spark 1.4.0。
要使用 window 功能,您必须先创建一个 window。定义与普通 SQL 几乎相同,这意味着您可以定义顺序、分区或两者。首先让我们创建一些虚拟数据:
import numpy as np
np.random.seed(1)
keys = ["foo"] * 10 + ["bar"] * 10
values = np.hstack([np.random.normal(0, 1, 10), np.random.normal(10, 1, 100)])
df = sqlContext.createDataFrame([
{"k": k, "v": round(float(v), 3)} for k, v in zip(keys, values)])
确保您使用的是 HiveContext
(仅限 Spark < 2.0):
from pyspark.sql import HiveContext
assert isinstance(sqlContext, HiveContext)
创建 window:
from pyspark.sql.window import Window
w = Window.partitionBy(df.k).orderBy(df.v)
相当于
(PARTITION BY k ORDER BY v)
在 SQL 中。
根据经验,window 定义应始终包含 PARTITION BY
子句,否则 Spark 会将所有数据移动到单个分区。 ORDER BY
对于某些函数是必需的,而在不同情况下(通常是聚合)可能是可选的。
还有两个可选的可用于定义 window 跨度 - ROWS BETWEEN
和 RANGE BETWEEN
。在这种特定情况下,这些对我们没有用。
终于可以用它来查询了:
from pyspark.sql.functions import percentRank, ntile
df.select(
"k", "v",
percentRank().over(w).alias("percent_rank"),
ntile(3).over(w).alias("ntile3")
)
请注意 ntile
与分位数没有任何关系。
我正在尝试对数据框使用一些 windows 函数(ntile
和 percentRank
),但我不知道如何使用它们。
谁能帮我解决这个问题?在 Python API documentation 中没有关于它的例子。
具体来说,我正在尝试获取数据框中数字字段的分位数。
我正在使用 spark 1.4.0。
要使用 window 功能,您必须先创建一个 window。定义与普通 SQL 几乎相同,这意味着您可以定义顺序、分区或两者。首先让我们创建一些虚拟数据:
import numpy as np
np.random.seed(1)
keys = ["foo"] * 10 + ["bar"] * 10
values = np.hstack([np.random.normal(0, 1, 10), np.random.normal(10, 1, 100)])
df = sqlContext.createDataFrame([
{"k": k, "v": round(float(v), 3)} for k, v in zip(keys, values)])
确保您使用的是 HiveContext
(仅限 Spark < 2.0):
from pyspark.sql import HiveContext
assert isinstance(sqlContext, HiveContext)
创建 window:
from pyspark.sql.window import Window
w = Window.partitionBy(df.k).orderBy(df.v)
相当于
(PARTITION BY k ORDER BY v)
在 SQL 中。
根据经验,window 定义应始终包含 PARTITION BY
子句,否则 Spark 会将所有数据移动到单个分区。 ORDER BY
对于某些函数是必需的,而在不同情况下(通常是聚合)可能是可选的。
还有两个可选的可用于定义 window 跨度 - ROWS BETWEEN
和 RANGE BETWEEN
。在这种特定情况下,这些对我们没有用。
终于可以用它来查询了:
from pyspark.sql.functions import percentRank, ntile
df.select(
"k", "v",
percentRank().over(w).alias("percent_rank"),
ntile(3).over(w).alias("ntile3")
)
请注意 ntile
与分位数没有任何关系。