如何在Pyspark中获取Array Type中重复次数最多的值?
How to obtain the most repeat value in Array Type in Pyspark?
我有一个如下所示的 pyspark 数据框:
columns = ["id","values"]
data = [("sample1", ["a","b","a"]), ("sample2", ["b","b","a","c"])]
dataframe = spark.sparkContext.parallelize(data)
来源
+-------+--------------------+
| id| values|
+-------+--------------------+
|sample1| ["a","b","a"]|
|sample2| ["b","b","a","c"]|
+-------+--------------------+
我想用数组中最常见的值构建一个列,并获得如下数据框:
+-------+--------------------+---------+
| id| values| common|
+-------+--------------------+---------+
|sample1| ["a","b","a"]| "a"|
|sample2| ["b","b","a","c"]| "b"|
+-------+--------------------+---------+
您可以分解数组 values
分组依据来计算每个值的出现次数,并使用 Window 过滤具有最大计数的值:
from pyspark.sql import Window
import pyspark.sql.functions as F
df1 = df.withColumn(
"common",
F.explode("values")
).groupBy("id", "values", "common").count().withColumn(
"rn",
F.row_number().over(Window.partitionBy("id", "values").orderBy(F.col("count").desc()))
).filter("rn = 1").drop("rn", "count")
df1.show()
#+-------+------------+------+
#|id |values |common|
#+-------+------------+------+
#|sample1|[a, b, a] |a |
#|sample2|[b, b, a, c]|b |
#+-------+------------+------+
另一种不使用 explode 的方法是使用高阶函数 transform
和 filter
以及一些数组函数:
df1 = df.withColumn(
"common",
F.array_max(
F.expr("""transform(
array_distinct(values),
x -> struct(
size(filter(values, y -> y = x)) as count,
x as value
)
)""")
)["value"]
)
我有一个如下所示的 pyspark 数据框:
columns = ["id","values"]
data = [("sample1", ["a","b","a"]), ("sample2", ["b","b","a","c"])]
dataframe = spark.sparkContext.parallelize(data)
来源
+-------+--------------------+
| id| values|
+-------+--------------------+
|sample1| ["a","b","a"]|
|sample2| ["b","b","a","c"]|
+-------+--------------------+
我想用数组中最常见的值构建一个列,并获得如下数据框:
+-------+--------------------+---------+
| id| values| common|
+-------+--------------------+---------+
|sample1| ["a","b","a"]| "a"|
|sample2| ["b","b","a","c"]| "b"|
+-------+--------------------+---------+
您可以分解数组 values
分组依据来计算每个值的出现次数,并使用 Window 过滤具有最大计数的值:
from pyspark.sql import Window
import pyspark.sql.functions as F
df1 = df.withColumn(
"common",
F.explode("values")
).groupBy("id", "values", "common").count().withColumn(
"rn",
F.row_number().over(Window.partitionBy("id", "values").orderBy(F.col("count").desc()))
).filter("rn = 1").drop("rn", "count")
df1.show()
#+-------+------------+------+
#|id |values |common|
#+-------+------------+------+
#|sample1|[a, b, a] |a |
#|sample2|[b, b, a, c]|b |
#+-------+------------+------+
另一种不使用 explode 的方法是使用高阶函数 transform
和 filter
以及一些数组函数:
df1 = df.withColumn(
"common",
F.array_max(
F.expr("""transform(
array_distinct(values),
x -> struct(
size(filter(values, y -> y = x)) as count,
x as value
)
)""")
)["value"]
)