按计数对 collect_set 进行排序

Sorting a collect_set by count

我知道collect_set可以随机排序。 是否有不同的方法来按计数订购 collect_set?我想根据单独的 id 列的分组依据为单个列创建一组最受欢迎的项目。 你要 collect_list 然后 运行 计数吗?

如果我没理解错的话,你想做人气排名analysis.you需要使用collect_list来保留重复值。

from collections import Counter
from pyspark.sql import SparkSession

spark = SparkSession.builder.enableHiveSupport().getOrCreate()


def elem_cnt(arr):
    return ['{}({})'.format(*i) for i in Counter(arr).most_common()]


spark.udf.register('elem_cnt_udf', elem_cnt)

data = [
    ('AC Milan', 'Ronaldo Luiz'),
    ('AC Milan', 'Paolo Maldini'),
    ('AC Milan', 'Kaká'),
    ('AC Milan', 'Ronaldo Luiz'),
    ('AC Milan', 'Andriy Shevchenko'),
    ('AC Milan', 'Van Basten'),
    ('AC Milan', 'Ronaldo Luiz'),
    ('AC Milan', 'Andriy Shevchenko'),
    ('AC Milan', 'Van Basten'),
    ('Milan', 'Ronaldo Luiz'),
    ('Milan', 'Paolo Maldini'),
    ('Milan', 'Ronaldo Luiz'),
    ('Milan', 'Van Basten')
]
schema = """
    id string,name string
"""
df = spark.createDataFrame(data, schema)
df.createOrReplaceTempView('tmp')
rank_sql = """
    select id,elem_cnt_udf(collect_list(name)) rank from tmp
    group by id
"""
rank_df = spark.sql(rank_sql)
rank_df.show(truncate=False)

不,没有按计数排序 collect_set 的方法,因为收集聚合方法不计算项目,没有可用于排序项目的信息。

因此,自 Spark 3.1 及更高版本以来,给定一个包含两列 iditemdataframe,您可以:

  1. iditems
  2. 列的 groupBy 执行 count
  3. collect (count, item)collect_list and struct 耦合到数组。 注意:这里可以用collect_set代替collect_list,但是没用因为我们确定(count, item)的每个元素都是唯一的
  4. 使用 sort_array 按计数降序对数组进行排序
  5. 将您的阵列与 transform 映射到删除 count

可以翻译成如下代码:

from pyspark.sql import functions as F

final_df = dataframe.groupBy('id', 'item').count() \
  .groupBy('id') \
  .agg(
    F.transform(
      F.sort_array(
        F.collect_list(F.struct("count", "item")),
        asc=False
      ),
      lambda x: x.getItem('item')
    ).alias('popular_items')
  )

注意:如果你的spark版本低于3.1但高于1.6,你可以将transform替换为withColumn如下:

from pyspark.sql import functions as F

final_df = dataframe.groupBy('id', 'item').count() \
  .groupBy('id') \
  .agg(F.sort_array(F.collect_list(F.struct("count", "item")), asc=False).alias('popular_items')) \
  .withColumn("popular_items", F.col('popular_items.item'))

例子

使用以下输入数据框:

+---+-----+
|id |item |
+---+-----+
|1  |item1|
|1  |item2|
|1  |item2|
|1  |item2|
|1  |item3|
|2  |item3|
|2  |item3|
|2  |item1|
|3  |item1|
|3  |item1|
+---+-----+

您得到以下输出:

+---+---------------------+
|id |popular_items        |
+---+---------------------+
|1  |[item2, item3, item1]|
|3  |[item1]              |
|2  |[item3, item1]       |
+---+---------------------+