使用类似 "all" 的函数 pyspark 聚合 GroupBy 列
Aggregate GroupBy columns with "all"-like function pyspark
我有一个包含主键、日期、变量和值的数据框。我想按主键分组并确定所有值是否都等于提供的值。示例数据:
import pandas as pd
from datetime import date
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = pd.DataFrame({
"pk": [1, 1, 1, 1, 2, 2, 2, 2, 3, 4],
"date": [
date("2022-05-06"),
date("2022-05-13"),
date("2022-05-06"),
date("2022-05-06"),
date("2022-05-14"),
date("2022-05-15"),
date("2022-05-05"),
date("2022-05-05"),
date("2022-05-11"),
date("2022-05-12")
],
"variable": [A, B, C, D, A, A, E, F, A, G],
"value": [2, 3, 2, 2, 1, 1, 1, 1, 5, 4]
})
df = spark.createDataFrame(df)
df.show()
df1.show()
#+-----+-----------+--------+-----+
#|pk | date|variable|value|
#+-----+-----------+--------+-----+
#| 1| 2022-05-06| A| 2|
#| 1| 2022-05-13| B| 3|
#| 1| 2022-05-06| C| 2|
#| 1| 2022-05-06| D| 2|
#| 2| 2022-05-14| A| 1|
#| 2| 2022-05-15| A| 1|
#| 2| 2022-05-05| E| 1|
#| 2| 2022-05-05| F| 1|
#| 3| 2022-05-11| A| 5|
#| 4| 2022-05-12| G| 4|
#+-----+-----------+--------+-----+
所以如果我想知道给定主键 pk
是否所有值都等于 1(或任何任意布尔测试),我应该怎么做?我试过执行 applyInPandas
但这不是超级有效,似乎有一个非常简单的方法可以做到这一点。
对于 Spark 3.+,您可以使用 forall
函数来检查 collect_list
收集的所有值是否满足布尔测试。
import pyspark.sql.functions as F
df1 = (df
.groupby("pk")
.agg(F.expr("forall(collect_list(value), v -> v == 1)").alias("value"))
)
df1.show()
# +---+-----+
# | pk|value|
# +---+-----+
# | 1|false|
# | 3|false|
# | 2| true|
# | 4|false|
# +---+-----+
# or create a column using window function
df2 = df.withColumn("test", F.expr("forall(collect_list(value) over (partition by pk), v -> v == 1)"))
df2.show()
# +---+----------+--------+-----+-----+
# | pk| date|variable|value| test|
# +---+----------+--------+-----+-----+
# | 1|2022-05-06| A| 2|false|
# | 1|2022-05-13| B| 3|false|
# | 1|2022-05-06| C| 2|false|
# | 1|2022-05-06| D| 2|false|
# | 3|2022-05-11| A| 5|false|
# | 2|2022-05-14| A| 1| true|
# | 2|2022-05-15| A| 1| true|
# | 2|2022-05-05| E| 1| true|
# | 2|2022-05-05| F| 1| true|
# | 4|2022-05-12| G| 4|false|
# +---+----------+--------+-----+-----+
您可能希望将它放在 case
子句中以处理 NULL 值。
我有一个包含主键、日期、变量和值的数据框。我想按主键分组并确定所有值是否都等于提供的值。示例数据:
import pandas as pd
from datetime import date
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = pd.DataFrame({
"pk": [1, 1, 1, 1, 2, 2, 2, 2, 3, 4],
"date": [
date("2022-05-06"),
date("2022-05-13"),
date("2022-05-06"),
date("2022-05-06"),
date("2022-05-14"),
date("2022-05-15"),
date("2022-05-05"),
date("2022-05-05"),
date("2022-05-11"),
date("2022-05-12")
],
"variable": [A, B, C, D, A, A, E, F, A, G],
"value": [2, 3, 2, 2, 1, 1, 1, 1, 5, 4]
})
df = spark.createDataFrame(df)
df.show()
df1.show()
#+-----+-----------+--------+-----+
#|pk | date|variable|value|
#+-----+-----------+--------+-----+
#| 1| 2022-05-06| A| 2|
#| 1| 2022-05-13| B| 3|
#| 1| 2022-05-06| C| 2|
#| 1| 2022-05-06| D| 2|
#| 2| 2022-05-14| A| 1|
#| 2| 2022-05-15| A| 1|
#| 2| 2022-05-05| E| 1|
#| 2| 2022-05-05| F| 1|
#| 3| 2022-05-11| A| 5|
#| 4| 2022-05-12| G| 4|
#+-----+-----------+--------+-----+
所以如果我想知道给定主键 pk
是否所有值都等于 1(或任何任意布尔测试),我应该怎么做?我试过执行 applyInPandas
但这不是超级有效,似乎有一个非常简单的方法可以做到这一点。
对于 Spark 3.+,您可以使用 forall
函数来检查 collect_list
收集的所有值是否满足布尔测试。
import pyspark.sql.functions as F
df1 = (df
.groupby("pk")
.agg(F.expr("forall(collect_list(value), v -> v == 1)").alias("value"))
)
df1.show()
# +---+-----+
# | pk|value|
# +---+-----+
# | 1|false|
# | 3|false|
# | 2| true|
# | 4|false|
# +---+-----+
# or create a column using window function
df2 = df.withColumn("test", F.expr("forall(collect_list(value) over (partition by pk), v -> v == 1)"))
df2.show()
# +---+----------+--------+-----+-----+
# | pk| date|variable|value| test|
# +---+----------+--------+-----+-----+
# | 1|2022-05-06| A| 2|false|
# | 1|2022-05-13| B| 3|false|
# | 1|2022-05-06| C| 2|false|
# | 1|2022-05-06| D| 2|false|
# | 3|2022-05-11| A| 5|false|
# | 2|2022-05-14| A| 1| true|
# | 2|2022-05-15| A| 1| true|
# | 2|2022-05-05| E| 1| true|
# | 2|2022-05-05| F| 1| true|
# | 4|2022-05-12| G| 4|false|
# +---+----------+--------+-----+-----+
您可能希望将它放在 case
子句中以处理 NULL 值。