使用 Pyspark 获取数组中最常见的元素
Get the most common element of an array using Pyspark
如何在使用 Pyspark 连接两列后获取数组中最常见的元素
df = spark.createDataFrame([
[['a','a','b'],['a']],
[['c','d','d'],['']],
[['e'],['e','f']],
[[''],['']]
]).toDF("arr_1","arr2")
df_new = df.withColumn('arr',F.concat(F.col('arr_1'),F.col('arr_2'))
预期输出:
+------------------------+
| arr | arr_1 | arr_2 |
+------------------------+
| [a] | [a,a,b] | [a] |
| [d] | [c,d,d] | [] |
| [e] | [e] | [e,f] |
| [] | [] | [] |
+------------------------+
你可以explode the arr
ay然后通过group by count, Window
我们可以获得出现次数最多的元素。
Example:
df = spark.createDataFrame([
[['a','a','b'],['a']],
[['c','d','d'],['']],
[['e'],['e','f']],
[[''],['']]
]).toDF("arr_1","arr_2")
df_new = df.withColumn('arr_concat',concat(col('arr_1'),col('arr_2')))
from pyspark.sql.functions import *
from pyspark.sql import *
df1=df_new.withColumn("mid",monotonically_increasing_id())
df2=df1.selectExpr("explode(arr_concat) as arr","mid").groupBy("mid","arr").agg(count(lit("1")).alias("cnt"))
w=Window.partitionBy("mid").orderBy(desc("cnt"))
df3=df2.withColumn("rn",row_number().over(w)).filter(col("rn") ==1).drop(*["rn","cnt"])
df3.join(df1,['mid'],'inner').drop(*['mid','arr_concat']).withColumn("arr",array(col("arr"))).show()
#+---+---------+------+
#|arr| arr_1| arr_2|
#+---+---------+------+
#|[d]|[c, d, d]| []|
#|[e]| [e]|[e, f]|
#|[a]|[a, a, b]| [a]|
#| []| []| []|
#+---+---------+------+
试一试
df1 = df.select('arr_1','arr_2',monotonically_increasing_id().alias('id'),concat('arr_1','arr_2').alias('arr'))
df1.select('id',explode('arr')).\
groupBy('id','col').count().\
select('id','col','count',rank().over(Window.partitionBy('id').orderBy(desc('count'))).alias('rank')).\
filter(col('rank')==1).\
join(df1,'id').\
select(col('col').alias('arr'), 'arr_1', 'arr_2').show()
+---+---------+------+
|arr| arr_1| arr_2|
+---+---------+------+
| a|[a, a, b]| [a]|
| | []| []|
| e| [e]|[e, f]|
| d|[c, d, d]| []|
+---+---------+------+
如何在使用 Pyspark 连接两列后获取数组中最常见的元素
df = spark.createDataFrame([
[['a','a','b'],['a']],
[['c','d','d'],['']],
[['e'],['e','f']],
[[''],['']]
]).toDF("arr_1","arr2")
df_new = df.withColumn('arr',F.concat(F.col('arr_1'),F.col('arr_2'))
预期输出:
+------------------------+
| arr | arr_1 | arr_2 |
+------------------------+
| [a] | [a,a,b] | [a] |
| [d] | [c,d,d] | [] |
| [e] | [e] | [e,f] |
| [] | [] | [] |
+------------------------+
你可以explode the arr
ay然后通过group by count, Window
我们可以获得出现次数最多的元素。
Example:
df = spark.createDataFrame([
[['a','a','b'],['a']],
[['c','d','d'],['']],
[['e'],['e','f']],
[[''],['']]
]).toDF("arr_1","arr_2")
df_new = df.withColumn('arr_concat',concat(col('arr_1'),col('arr_2')))
from pyspark.sql.functions import *
from pyspark.sql import *
df1=df_new.withColumn("mid",monotonically_increasing_id())
df2=df1.selectExpr("explode(arr_concat) as arr","mid").groupBy("mid","arr").agg(count(lit("1")).alias("cnt"))
w=Window.partitionBy("mid").orderBy(desc("cnt"))
df3=df2.withColumn("rn",row_number().over(w)).filter(col("rn") ==1).drop(*["rn","cnt"])
df3.join(df1,['mid'],'inner').drop(*['mid','arr_concat']).withColumn("arr",array(col("arr"))).show()
#+---+---------+------+
#|arr| arr_1| arr_2|
#+---+---------+------+
#|[d]|[c, d, d]| []|
#|[e]| [e]|[e, f]|
#|[a]|[a, a, b]| [a]|
#| []| []| []|
#+---+---------+------+
试一试
df1 = df.select('arr_1','arr_2',monotonically_increasing_id().alias('id'),concat('arr_1','arr_2').alias('arr'))
df1.select('id',explode('arr')).\
groupBy('id','col').count().\
select('id','col','count',rank().over(Window.partitionBy('id').orderBy(desc('count'))).alias('rank')).\
filter(col('rank')==1).\
join(df1,'id').\
select(col('col').alias('arr'), 'arr_1', 'arr_2').show()
+---+---------+------+
|arr| arr_1| arr_2|
+---+---------+------+
| a|[a, a, b]| [a]|
| | []| []|
| e| [e]|[e, f]|
| d|[c, d, d]| []|
+---+---------+------+